You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flink Developer <de...@protonmail.com> on 2018/10/27 21:07:54 UTC

Flink Kafka to BucketingSink to S3 - S3Exception

Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic and sinks to S3 in the format of: s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files writing simultaneously.

Configuration:
- Flink v1.5.2
- Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause between checkpoints in 2 mins. Timeout is set to 2 mins.
- BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
- Batch file size is set to 5mb.
- Batch rollover interval is set to 30min
- Writer uses GZip compression
- Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, hadoop-core v1.2.1, hadoop-aws v3.1.1)

The app is able to run for hours straight, but occasionally (once or twice a day), it displays the following exception. When this happens, the app is able to recover from previous checkpoint, but I am concerned about the exception:

Caused by: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx

- at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)

- at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)

- at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)

- at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)

- at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)

Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)

- at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)

- at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)

- at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)

- at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)

And sometimes, it will show this:

- java.lang.RuntimeException: Error while restoring BucketingSink state.

- at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)

- at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)

- at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)

- at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

- at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

- at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

- at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)

- at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)

- at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)

- at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

What causes this and how can it be resolved? Thank you.

There seems to be a related Flink ticket and PR here, but I'm not sure if this is the exact same issue and if it has been resolved:
https://issues.apache.org/jira/browse/FLINK-6306
https://github.com/apache/flink/pull/3752
https://github.com/apache/flink/pull/4607

Re: Flink Kafka to BucketingSink to S3 - S3Exception

Posted by Ravi Bhushan Ratnakar <ra...@gmail.com>.
Hi there,

some questions:

   1. Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop,
   flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs,
   hadoop-common) ? If so, could you please share your dependency versioning?

[Ravi]- I am using Aws Emr 5.18 which supports Flink 1.6.0.  On Emr, all
the Hadoop and aws related dependencies were available, so I explicitly
removed it from transitive dependencies of flink module like
flink-s3-fs-hadoop.
I was also facing issue in using flink-s3-fs-hadoop on Emr, class conflict
due to shaded aws API, so I unpack the jar and included the required class
in the project.

   1. Does this use a kafka source with high flink parallelism (~400) for
   all kafka partitions and does it run continuously for several days?

[Ravi]- I am using Kinesis source with parallelism 640. So far now, we are
not able to run more than 20 hours.


   1. Could you please share your checkpoint interval configuration, batch
   file size, batch rollover interval configuration, and sink prefix (s3:// ,
   s3a://)

[Ravi]- after doing couple of round of testing, we realized that checkpoint
will require more resources as per the outage duration and we face several
checkpoints related issue. So finally we decided not to use checkpoints as
state size for us of 5 minutes windows is around 40Gb. Checkpoint interval
- 10 sec, sink prefix s3a:/, batch file size 256mb,  rollover inactivity 60
secs

Thank you

On Mon 5 Nov, 2018, 17:24 Addison Higham <addisonj@gmail.com wrote:

> Hi there,
>
> This is going to be a bit of a long post, but I think there has been a lot
> of confusion around S3, so I am going to go over everything I know in hopes
> that helps.
>
> As mentioned by Rafi, The BucketingSink does not work for file systems
> like S3, as the bucketing sink makes some assumptions that are incorrect
> for eventually consistent file systems as well as for file systems that
> don't have certain atomic operations, which leads to inconsistency (see
> https://issues.apache.org/jira/browse/FLINK-6306). This has been poorly
> documented in the docs, so I think a lot of people have tried to use s3
> only to face issues. There is a plan for moving forward however.
>
> However, that plan does NOT include "fixing" the BucketingSink. Instead, a
> new API - the StreamingFileSink - is the replacement for BucketingSink,
> which was first introduced in Flink 1.6 is planned to (eventually) fix the
> problem. The first release of StreamingFileSink in the 1.6 branch didn't
> support S3. This was originally seen as a bug that would be fixed in Flink
> 1.6.2, however, once all the work was done to add support for S3, it seems
> it was decided not to backport the fix (see this thread:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-9752-s3-recoverable-writer-not-actually-fixed-in-1-6-2-td24925.html).
> This means that flink 1.6.2 does NOT fix the S3 issue, but the fix will be
> included in 1.7, which is currently in feature freeze and will hopefully
> have an RC in the next couple of weeks.
>
> But yes, if you need S3 support ASAP, you are in a bit of a pickle. My
> team is in that situation, so this the options as we saw them:
>
> 0. Wait for flink 1.7
> 1. Run and build your own flink from master or flink-1.7 branches which
> has support for S3 and StreamingFileSink
> 2. Write our own custom sink for s3 (probably with some caveats)
> 3. Backport the changes into flink 1.6
>
> We really didn't want to wait for 1.7, as that would make our delivery
> timeline not great. We didn't love the idea of running a fun unreleased
> version of flink in production either. As we looked into writing something
> ourselves, it became clear
> pretty quick that we could fairly easily get an output sink to a file that
> would be at-least-once delivery to a file, but exactly-once delivery would
> be significantly more difficult. That is actually okay for our use case,
> but we decided we would rather not have to
> revisit this later on and change all the code and then run a one-off job
> to remove dupes. Instead, we decided to backport the changes into 1.6
> branch. Luckily, we already build our own flink, so we had that tooling
> already. The backport took a few hours (it was fairly complicated to get
> all the changes), but we seem to got everything
> working. The backport is here:
> https://github.com/instructure/flink/tree/s3_recover_backport. Our plan
> is to use that backport until 1.7 is stable, then we can upgrade without
> (hopefully) having to change any code. We still recognize there is a
> possibility for bugs in the backport, but
> for us that is mitigated by the fact that we are okay with at-least-once
> and if all else fails, we have a period of transition where we have this
> data being written in another location we can fall back to.
>
> So yeah, to reiterate, no out-of-the-box S3 stuff works ATM, but that
> should hopefully be fixed *soon*. If you can wait, that is the easiest, if
> you can't, building either your own custom sink or your own flink with the
> backport isn't a terrible option.
>
> Hope that helps!
>
> Adddison
>
>
>
>
> On Sun, Nov 4, 2018 at 3:09 AM Flink Developer <
> developer143@protonmail.com> wrote:
>
>> Hi Ravi, some questions:
>>
>>    1. Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop,
>>    flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs,
>>    hadoop-common) ? If so, could you please share your dependency versioning?
>>    2. Does this use a kafka source with high flink parallelism (~400)
>>    for all kafka partitions and does it run continuously for several days?
>>    3. Could you please share your checkpoint interval configuration,
>>    batch file size, batch rollover interval configuration, and sink prefix
>>    (s3:// ,  s3a://)
>>
>> Thank you
>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>> On Saturday, November 3, 2018 7:18 AM, Ravi Bhushan Ratnakar <
>> ravibhushanratnakar@gmail.com> wrote:
>>
>> I have done little changes in BucketingSink and implemented as new
>> CustomBucketingSink to use in my project which works fine with s3 and s3a
>> protocol.  This implementation doesn't require xml file configuration,
>> rather than it uses configuration provided using flink configuration object
>> by calling setConfig method of BucketingSink.
>>
>> On Sat 3 Nov, 2018, 09:24 Flink Developer <developer143@protonmail.com
>> wrote:
>>
>>> It seems the issue also appears when using
>>> *Flink version 1.6.2 . *
>>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>>> On Tuesday, October 30, 2018 10:26 PM, Flink Developer <
>>> developer143@protonmail.com> wrote:
>>>
>>> Hi, thanks for the info Rafi, that seems to be related.  I hope *Flink
>>> version 1.6.2* fixes this. Has anyone encountered this before?
>>>
>>> I would also like to note that my jar includes a *core-site.xml* file
>>> that uses **s3a**. Is this the recommended configuration to use with
>>> BucketingSink?   Should the sink be specified using
>>> *s3a://<bucket>/<prefix>* or  *s3://<bucket>/<prefix> *?
>>>
>>>    - <configuration>
>>>    -     <property>
>>>    -         <name>fs.s3.impl</name>
>>>    -         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>>>    -     </property>
>>>    -     <property>
>>>    -         <name>fs.s3a.buffer.dir</name>
>>>    -         <value>/tmp</value>
>>>    -     </property>
>>>    -     <property>
>>>    -         <name>fs.s3a.access.key</name>
>>>    -         <value>xxxxx</value>
>>>    -     </property>
>>>    -     <property>
>>>    -         <name>fs.s3a.secret.key</name>
>>>    -         <value>xxxxx</value>
>>>    -     </property>
>>>    -     <property>
>>>    -         <name>fs.s3a.buffer.dir</name>
>>>    -         <value>/tmp</value>
>>>    -     </property>
>>>    - </configuration>
>>>
>>>
>>> And my pom.xml uses:
>>>
>>>    - <artifactId>flink-s3-fs-hadoop</artifactId>
>>>    - ...
>>>    - <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>>    - ...
>>>    - <artifactId>hadoop-hdfs</artifactId>
>>>    - ...
>>>    - <artifactId>hadoop-common</artifactId>
>>>    - ...
>>>    - <artifactId>hadoop-core</artifactId>
>>>    - ...
>>>    - <artifactId>hadoop-aws</artifactId>
>>>    - ...
>>>
>>>
>>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>>> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch <ra...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I'm also experiencing this with Flink 1.5.2. This is probably related to
>>> BucketingSink not working properly with S3 as filesystem because of the
>>> eventual-consistency of S3.
>>>
>>> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be
>>> part of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and
>>> not presto).
>>>
>>> Does anyone know if this fix would solve this issue?
>>>
>>> Thanks,
>>> Rafi
>>>
>>>
>>> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <
>>> developer143@protonmail.com> wrote:
>>>
>>>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17,
>>>> hadoop 2.8.4)  with flink parallelization set to 400. The source is a Kafka
>>>> topic and sinks to S3 in the format of:
>>>> s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files
>>>> writing simultaneously.
>>>>
>>>> *Configuration:*
>>>> - Flink v1.5.2
>>>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11,
>>>> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause
>>>> between checkpoints in 2 mins. Timeout is set to 2 mins.
>>>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>>>> - Batch file size is set to 5mb.
>>>> - Batch rollover interval is set to 30min
>>>> - Writer uses GZip compression
>>>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1,
>>>> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>>>
>>>> The app is able to run for hours straight, but occasionally (once or
>>>> twice a day), it displays the following exception. When this happens, the
>>>> app is able to recover from previous checkpoint, but I am concerned about
>>>> the exception:
>>>>
>>>> *Caused by: java.io.IOException:
>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>>> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
>>>> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx*
>>>>
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)*
>>>>    - *at
>>>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)*
>>>>
>>>> *Caused by:
>>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>>> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
>>>> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx*
>>>>
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)*
>>>>    - *at
>>>>    com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)*
>>>>
>>>>
>>>> *And sometimes, it will show this:*
>>>>
>>>>    - *java.lang.RuntimeException: Error while restoring BucketingSink
>>>>    state.*
>>>>    - *at
>>>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)*
>>>>    - *at
>>>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)*
>>>>    - *at
>>>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)*
>>>>    - *at
>>>>    org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)*
>>>>    - *at
>>>>    org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)*
>>>>    - *at
>>>>    org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)*
>>>>    - *at
>>>>    org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)*
>>>>    - *at
>>>>    org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)*
>>>>    - *at
>>>>    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)*
>>>>    - *at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)*
>>>>
>>>>
>>>> What causes this and how can it be resolved? Thank you.
>>>>
>>>> There seems to be a related Flink ticket and PR here, but I'm not sure
>>>> if this is the exact same issue and if it has been resolved:
>>>> https://issues.apache.org/jira/browse/FLINK-6306
>>>> https://github.com/apache/flink/pull/3752
>>>> https://github.com/apache/flink/pull/4607
>>>>
>>>
>>>
>>>
>>

Re: Flink Kafka to BucketingSink to S3 - S3Exception

Posted by Flink Developer <de...@protonmail.com>.
Thank you Addison and Ravi for the detailed info.

Hi Addison, it sounds like StreamingFileSink is promising and will be available in Flink 1.7. From the mailing list, it looks like Flink 1.7 RC is now available for use.

Some questions for you... in your use case, is your source Kafka and is the Flink app running high parallelism (>300)? Are you able to run with StreamingFileSink to S3 for multiple days without failure? When using StreamingFileSink, what type of configuration did you use? Thank you.

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On Monday, November 5, 2018 8:23 AM, Addison Higham <ad...@gmail.com> wrote:

> Hi there,
>
> This is going to be a bit of a long post, but I think there has been a lot of confusion around S3, so I am going to go over everything I know in hopes that helps.
>
> As mentioned by Rafi, The BucketingSink does not work for file systems like S3, as the bucketing sink makes some assumptions that are incorrect for eventually consistent file systems as well as for file systems that don't have certain atomic operations, which leads to inconsistency (see https://issues.apache.org/jira/browse/FLINK-6306). This has been poorly documented in the docs, so I think a lot of people have tried to use s3 only to face issues. There is a plan for moving forward however.
>
> However, that plan does NOT include "fixing" the BucketingSink. Instead, a new API - the StreamingFileSink - is the replacement for BucketingSink, which was first introduced in Flink 1.6 is planned to (eventually) fix the problem. The first release of StreamingFileSink in the 1.6 branch didn't support S3. This was originally seen as a bug that would be fixed in Flink 1.6.2, however, once all the work was done to add support for S3, it seems it was decided not to backport the fix (see this thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-9752-s3-recoverable-writer-not-actually-fixed-in-1-6-2-td24925.html). This means that flink 1.6.2 does NOT fix the S3 issue, but the fix will be included in 1.7, which is currently in feature freeze and will hopefully have an RC in the next couple of weeks.
>
> But yes, if you need S3 support ASAP, you are in a bit of a pickle. My team is in that situation, so this the options as we saw them:
>
> 0. Wait for flink 1.7
> 1. Run and build your own flink from master or flink-1.7 branches which has support for S3 and StreamingFileSink
> 2. Write our own custom sink for s3 (probably with some caveats)
> 3. Backport the changes into flink 1.6
>
> We really didn't want to wait for 1.7, as that would make our delivery timeline not great. We didn't love the idea of running a fun unreleased version of flink in production either. As we looked into writing something ourselves, it became clear
> pretty quick that we could fairly easily get an output sink to a file that would be at-least-once delivery to a file, but exactly-once delivery would be significantly more difficult. That is actually okay for our use case, but we decided we would rather not have to
> revisit this later on and change all the code and then run a one-off job to remove dupes. Instead, we decided to backport the changes into 1.6 branch. Luckily, we already build our own flink, so we had that tooling already. The backport took a few hours (it was fairly complicated to get all the changes), but we seem to got everything
> working. The backport is here: https://github.com/instructure/flink/tree/s3_recover_backport. Our plan is to use that backport until 1.7 is stable, then we can upgrade without (hopefully) having to change any code. We still recognize there is a possibility for bugs in the backport, but
> for us that is mitigated by the fact that we are okay with at-least-once and if all else fails, we have a period of transition where we have this data being written in another location we can fall back to.
>
> So yeah, to reiterate, no out-of-the-box S3 stuff works ATM, but that should hopefully be fixed *soon*. If you can wait, that is the easiest, if you can't, building either your own custom sink or your own flink with the backport isn't a terrible option.
>
> Hope that helps!
>
> Adddison
>
> On Sun, Nov 4, 2018 at 3:09 AM Flink Developer <de...@protonmail.com> wrote:
>
>> Hi Ravi, some questions:
>>
>> - Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop, flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs, hadoop-common) ? If so, could you please share your dependency versioning?
>> - Does this use a kafka source with high flink parallelism (~400) for all kafka partitions and does it run continuously for several days?
>> - Could you please share your checkpoint interval configuration, batch file size, batch rollover interval configuration, and sink prefix (s3:// ,  s3a://)
>>
>> Thank you
>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>> On Saturday, November 3, 2018 7:18 AM, Ravi Bhushan Ratnakar <ra...@gmail.com> wrote:
>>
>>> I have done little changes in BucketingSink and implemented as new CustomBucketingSink to use in my project which works fine with s3 and s3a protocol.  This implementation doesn't require xml file configuration, rather than it uses configuration provided using flink configuration object by calling setConfig method of BucketingSink.
>>>
>>> On Sat 3 Nov, 2018, 09:24 Flink Developer <developer143@protonmail.com wrote:
>>>
>>>> It seems the issue also appears when using Flink version 1.6.2 .
>>>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>>>> On Tuesday, October 30, 2018 10:26 PM, Flink Developer <de...@protonmail.com> wrote:
>>>>
>>>>> Hi, thanks for the info Rafi, that seems to be related.  I hope Flink version 1.6.2 fixes this. Has anyone encountered this before?
>>>>>
>>>>> I would also like to note that my jar includes a core-site.xml file that uses *s3a*. Is this the recommended configuration to use with BucketingSink?   Should the sink be specified using s3a://<bucket>/<prefix> or  s3://<bucket>/<prefix> ?
>>>>>
>>>>> - <configuration>
>>>>> -     <property>
>>>>> -         <name>fs.s3.impl</name>
>>>>> -         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>>>>> -     </property>
>>>>> -     <property>
>>>>> -         <name>fs.s3a.buffer.dir</name>
>>>>> -         <value>/tmp</value>
>>>>> -     </property>
>>>>> -     <property>
>>>>> -         <name>fs.s3a.access.key</name>
>>>>> -         <value>xxxxx</value>
>>>>> -     </property>
>>>>> -     <property>
>>>>> -         <name>fs.s3a.secret.key</name>
>>>>> -         <value>xxxxx</value>
>>>>> -     </property>
>>>>> -     <property>
>>>>> -         <name>fs.s3a.buffer.dir</name>
>>>>> -         <value>/tmp</value>
>>>>> -     </property>
>>>>> - </configuration>
>>>>>
>>>>> And my pom.xml uses:
>>>>>
>>>>> - <artifactId>flink-s3-fs-hadoop</artifactId>
>>>>> - ...
>>>>> - <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>>>> - ...
>>>>> - <artifactId>hadoop-hdfs</artifactId>
>>>>> - ...
>>>>> - <artifactId>hadoop-common</artifactId>
>>>>> - ...
>>>>> - <artifactId>hadoop-core</artifactId>
>>>>> - ...
>>>>> - <artifactId>hadoop-aws</artifactId>
>>>>> - ...
>>>>>
>>>>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>>>>> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch <ra...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm also experiencing this with Flink 1.5.2. This is probably related to BucketingSink not working properly with S3 as filesystem because of the eventual-consistency of S3.
>>>>>>
>>>>>> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not presto).
>>>>>>
>>>>>> Does anyone know if this fix would solve this issue?
>>>>>>
>>>>>> Thanks,
>>>>>> Rafi
>>>>>>
>>>>>> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <de...@protonmail.com> wrote:
>>>>>>
>>>>>>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic and sinks to S3 in the format of: s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files writing simultaneously.
>>>>>>>
>>>>>>> Configuration:
>>>>>>> - Flink v1.5.2
>>>>>>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause between checkpoints in 2 mins. Timeout is set to 2 mins.
>>>>>>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>>>>>>> - Batch file size is set to 5mb.
>>>>>>> - Batch rollover interval is set to 30min
>>>>>>> - Writer uses GZip compression
>>>>>>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>>>>>>
>>>>>>> The app is able to run for hours straight, but occasionally (once or twice a day), it displays the following exception. When this happens, the app is able to recover from previous checkpoint, but I am concerned about the exception:
>>>>>>>
>>>>>>> Caused by: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)
>>>>>>>
>>>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>>>>>>>
>>>>>>> Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)
>>>>>>>
>>>>>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)
>>>>>>>
>>>>>>> And sometimes, it will show this:
>>>>>>>
>>>>>>> - java.lang.RuntimeException: Error while restoring BucketingSink state.
>>>>>>>
>>>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)
>>>>>>>
>>>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)
>>>>>>>
>>>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)
>>>>>>>
>>>>>>> - at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>>>>>>
>>>>>>> - at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>>>>>>
>>>>>>> - at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>>>>>
>>>>>>> - at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>>>>>>>
>>>>>>> - at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>>>>>>>
>>>>>>> - at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>>>>>>>
>>>>>>> - at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>>>>
>>>>>>> What causes this and how can it be resolved? Thank you.
>>>>>>>
>>>>>>> There seems to be a related Flink ticket and PR here, but I'm not sure if this is the exact same issue and if it has been resolved:
>>>>>>> https://issues.apache.org/jira/browse/FLINK-6306
>>>>>>> https://github.com/apache/flink/pull/3752
>>>>>>> https://github.com/apache/flink/pull/4607

Re: Flink Kafka to BucketingSink to S3 - S3Exception

Posted by Addison Higham <ad...@gmail.com>.
Hi there,

This is going to be a bit of a long post, but I think there has been a lot
of confusion around S3, so I am going to go over everything I know in hopes
that helps.

As mentioned by Rafi, The BucketingSink does not work for file systems like
S3, as the bucketing sink makes some assumptions that are incorrect for
eventually consistent file systems as well as for file systems that don't
have certain atomic operations, which leads to inconsistency (see
https://issues.apache.org/jira/browse/FLINK-6306). This has been poorly
documented in the docs, so I think a lot of people have tried to use s3
only to face issues. There is a plan for moving forward however.

However, that plan does NOT include "fixing" the BucketingSink. Instead, a
new API - the StreamingFileSink - is the replacement for BucketingSink,
which was first introduced in Flink 1.6 is planned to (eventually) fix the
problem. The first release of StreamingFileSink in the 1.6 branch didn't
support S3. This was originally seen as a bug that would be fixed in Flink
1.6.2, however, once all the work was done to add support for S3, it seems
it was decided not to backport the fix (see this thread:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/FLINK-9752-s3-recoverable-writer-not-actually-fixed-in-1-6-2-td24925.html).
This means that flink 1.6.2 does NOT fix the S3 issue, but the fix will be
included in 1.7, which is currently in feature freeze and will hopefully
have an RC in the next couple of weeks.

But yes, if you need S3 support ASAP, you are in a bit of a pickle. My team
is in that situation, so this the options as we saw them:

0. Wait for flink 1.7
1. Run and build your own flink from master or flink-1.7 branches which has
support for S3 and StreamingFileSink
2. Write our own custom sink for s3 (probably with some caveats)
3. Backport the changes into flink 1.6

We really didn't want to wait for 1.7, as that would make our delivery
timeline not great. We didn't love the idea of running a fun unreleased
version of flink in production either. As we looked into writing something
ourselves, it became clear
pretty quick that we could fairly easily get an output sink to a file that
would be at-least-once delivery to a file, but exactly-once delivery would
be significantly more difficult. That is actually okay for our use case,
but we decided we would rather not have to
revisit this later on and change all the code and then run a one-off job to
remove dupes. Instead, we decided to backport the changes into 1.6 branch.
Luckily, we already build our own flink, so we had that tooling already.
The backport took a few hours (it was fairly complicated to get all the
changes), but we seem to got everything
working. The backport is here:
https://github.com/instructure/flink/tree/s3_recover_backport. Our plan is
to use that backport until 1.7 is stable, then we can upgrade without
(hopefully) having to change any code. We still recognize there is a
possibility for bugs in the backport, but
for us that is mitigated by the fact that we are okay with at-least-once
and if all else fails, we have a period of transition where we have this
data being written in another location we can fall back to.

So yeah, to reiterate, no out-of-the-box S3 stuff works ATM, but that
should hopefully be fixed *soon*. If you can wait, that is the easiest, if
you can't, building either your own custom sink or your own flink with the
backport isn't a terrible option.

Hope that helps!

Adddison




On Sun, Nov 4, 2018 at 3:09 AM Flink Developer <de...@protonmail.com>
wrote:

> Hi Ravi, some questions:
>
>    1. Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop,
>    flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs,
>    hadoop-common) ? If so, could you please share your dependency versioning?
>    2. Does this use a kafka source with high flink parallelism (~400) for
>    all kafka partitions and does it run continuously for several days?
>    3. Could you please share your checkpoint interval configuration,
>    batch file size, batch rollover interval configuration, and sink prefix
>    (s3:// ,  s3a://)
>
> Thank you
> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
> On Saturday, November 3, 2018 7:18 AM, Ravi Bhushan Ratnakar <
> ravibhushanratnakar@gmail.com> wrote:
>
> I have done little changes in BucketingSink and implemented as new
> CustomBucketingSink to use in my project which works fine with s3 and s3a
> protocol.  This implementation doesn't require xml file configuration,
> rather than it uses configuration provided using flink configuration object
> by calling setConfig method of BucketingSink.
>
> On Sat 3 Nov, 2018, 09:24 Flink Developer <developer143@protonmail.com
> wrote:
>
>> It seems the issue also appears when using
>> *Flink version 1.6.2 . *
>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>> On Tuesday, October 30, 2018 10:26 PM, Flink Developer <
>> developer143@protonmail.com> wrote:
>>
>> Hi, thanks for the info Rafi, that seems to be related.  I hope *Flink
>> version 1.6.2* fixes this. Has anyone encountered this before?
>>
>> I would also like to note that my jar includes a *core-site.xml* file
>> that uses **s3a**. Is this the recommended configuration to use with
>> BucketingSink?   Should the sink be specified using
>> *s3a://<bucket>/<prefix>* or  *s3://<bucket>/<prefix> *?
>>
>>    - <configuration>
>>    -     <property>
>>    -         <name>fs.s3.impl</name>
>>    -         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>>    -     </property>
>>    -     <property>
>>    -         <name>fs.s3a.buffer.dir</name>
>>    -         <value>/tmp</value>
>>    -     </property>
>>    -     <property>
>>    -         <name>fs.s3a.access.key</name>
>>    -         <value>xxxxx</value>
>>    -     </property>
>>    -     <property>
>>    -         <name>fs.s3a.secret.key</name>
>>    -         <value>xxxxx</value>
>>    -     </property>
>>    -     <property>
>>    -         <name>fs.s3a.buffer.dir</name>
>>    -         <value>/tmp</value>
>>    -     </property>
>>    - </configuration>
>>
>>
>> And my pom.xml uses:
>>
>>    - <artifactId>flink-s3-fs-hadoop</artifactId>
>>    - ...
>>    - <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>    - ...
>>    - <artifactId>hadoop-hdfs</artifactId>
>>    - ...
>>    - <artifactId>hadoop-common</artifactId>
>>    - ...
>>    - <artifactId>hadoop-core</artifactId>
>>    - ...
>>    - <artifactId>hadoop-aws</artifactId>
>>    - ...
>>
>>
>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch <ra...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I'm also experiencing this with Flink 1.5.2. This is probably related to
>> BucketingSink not working properly with S3 as filesystem because of the
>> eventual-consistency of S3.
>>
>> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part
>> of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and
>> not presto).
>>
>> Does anyone know if this fix would solve this issue?
>>
>> Thanks,
>> Rafi
>>
>>
>> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <
>> developer143@protonmail.com> wrote:
>>
>>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17,
>>> hadoop 2.8.4)  with flink parallelization set to 400. The source is a Kafka
>>> topic and sinks to S3 in the format of:
>>> s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files
>>> writing simultaneously.
>>>
>>> *Configuration:*
>>> - Flink v1.5.2
>>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11,
>>> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause
>>> between checkpoints in 2 mins. Timeout is set to 2 mins.
>>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>>> - Batch file size is set to 5mb.
>>> - Batch rollover interval is set to 30min
>>> - Writer uses GZip compression
>>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1,
>>> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>>
>>> The app is able to run for hours straight, but occasionally (once or
>>> twice a day), it displays the following exception. When this happens, the
>>> app is able to recover from previous checkpoint, but I am concerned about
>>> the exception:
>>>
>>> *Caused by: java.io.IOException:
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
>>> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx*
>>>
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)*
>>>    - *at
>>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)*
>>>
>>> *Caused by:
>>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>>> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
>>> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx*
>>>
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)*
>>>    - *at
>>>    com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)*
>>>
>>>
>>> *And sometimes, it will show this:*
>>>
>>>    - *java.lang.RuntimeException: Error while restoring BucketingSink
>>>    state.*
>>>    - *at
>>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)*
>>>    - *at
>>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)*
>>>    - *at
>>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)*
>>>    - *at
>>>    org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)*
>>>    - *at
>>>    org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)*
>>>    - *at
>>>    org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)*
>>>    - *at
>>>    org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)*
>>>    - *at
>>>    org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)*
>>>    - *at
>>>    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)*
>>>    - *at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)*
>>>
>>>
>>> What causes this and how can it be resolved? Thank you.
>>>
>>> There seems to be a related Flink ticket and PR here, but I'm not sure
>>> if this is the exact same issue and if it has been resolved:
>>> https://issues.apache.org/jira/browse/FLINK-6306
>>> https://github.com/apache/flink/pull/3752
>>> https://github.com/apache/flink/pull/4607
>>>
>>
>>
>>
>

Re: Flink Kafka to BucketingSink to S3 - S3Exception

Posted by Flink Developer <de...@protonmail.com>.
Hi Ravi, some questions:

- Is this using Flink 1.6.2 with dependencies (flink-s3-fs-hadoop, flink-statebackend-rocksdb, hadoop-common, hadoop-aws, hadoop-hdfs, hadoop-common) ? If so, could you please share your dependency versioning?
- Does this use a kafka source with high flink parallelism (~400) for all kafka partitions and does it run continuously for several days?
- Could you please share your checkpoint interval configuration, batch file size, batch rollover interval configuration, and sink prefix (s3:// ,  s3a://)

Thank you
‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On Saturday, November 3, 2018 7:18 AM, Ravi Bhushan Ratnakar <ra...@gmail.com> wrote:

> I have done little changes in BucketingSink and implemented as new CustomBucketingSink to use in my project which works fine with s3 and s3a protocol.  This implementation doesn't require xml file configuration, rather than it uses configuration provided using flink configuration object by calling setConfig method of BucketingSink.
>
> On Sat 3 Nov, 2018, 09:24 Flink Developer <developer143@protonmail.com wrote:
>
>> It seems the issue also appears when using Flink version 1.6.2 .
>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>> On Tuesday, October 30, 2018 10:26 PM, Flink Developer <de...@protonmail.com> wrote:
>>
>>> Hi, thanks for the info Rafi, that seems to be related.  I hope Flink version 1.6.2 fixes this. Has anyone encountered this before?
>>>
>>> I would also like to note that my jar includes a core-site.xml file that uses *s3a*. Is this the recommended configuration to use with BucketingSink?   Should the sink be specified using s3a://<bucket>/<prefix> or  s3://<bucket>/<prefix> ?
>>>
>>> - <configuration>
>>> -     <property>
>>> -         <name>fs.s3.impl</name>
>>> -         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.buffer.dir</name>
>>> -         <value>/tmp</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.access.key</name>
>>> -         <value>xxxxx</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.secret.key</name>
>>> -         <value>xxxxx</value>
>>> -     </property>
>>> -     <property>
>>> -         <name>fs.s3a.buffer.dir</name>
>>> -         <value>/tmp</value>
>>> -     </property>
>>> - </configuration>
>>>
>>> And my pom.xml uses:
>>>
>>> - <artifactId>flink-s3-fs-hadoop</artifactId>
>>> - ...
>>> - <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>>> - ...
>>> - <artifactId>hadoop-hdfs</artifactId>
>>> - ...
>>> - <artifactId>hadoop-common</artifactId>
>>> - ...
>>> - <artifactId>hadoop-core</artifactId>
>>> - ...
>>> - <artifactId>hadoop-aws</artifactId>
>>> - ...
>>>
>>> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
>>> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch <ra...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm also experiencing this with Flink 1.5.2. This is probably related to BucketingSink not working properly with S3 as filesystem because of the eventual-consistency of S3.
>>>>
>>>> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not presto).
>>>>
>>>> Does anyone know if this fix would solve this issue?
>>>>
>>>> Thanks,
>>>> Rafi
>>>>
>>>> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <de...@protonmail.com> wrote:
>>>>
>>>>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic and sinks to S3 in the format of: s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files writing simultaneously.
>>>>>
>>>>> Configuration:
>>>>> - Flink v1.5.2
>>>>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause between checkpoints in 2 mins. Timeout is set to 2 mins.
>>>>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>>>>> - Batch file size is set to 5mb.
>>>>> - Batch rollover interval is set to 30min
>>>>> - Writer uses GZip compression
>>>>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>>>>
>>>>> The app is able to run for hours straight, but occasionally (once or twice a day), it displays the following exception. When this happens, the app is able to recover from previous checkpoint, but I am concerned about the exception:
>>>>>
>>>>> Caused by: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)
>>>>>
>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>>>>>
>>>>> Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)
>>>>>
>>>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)
>>>>>
>>>>> And sometimes, it will show this:
>>>>>
>>>>> - java.lang.RuntimeException: Error while restoring BucketingSink state.
>>>>>
>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)
>>>>>
>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)
>>>>>
>>>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)
>>>>>
>>>>> - at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>>>>
>>>>> - at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>>>>
>>>>> - at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>>>
>>>>> - at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>>>>>
>>>>> - at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>>>>>
>>>>> - at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>>>>>
>>>>> - at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>>>
>>>>> What causes this and how can it be resolved? Thank you.
>>>>>
>>>>> There seems to be a related Flink ticket and PR here, but I'm not sure if this is the exact same issue and if it has been resolved:
>>>>> https://issues.apache.org/jira/browse/FLINK-6306
>>>>> https://github.com/apache/flink/pull/3752
>>>>> https://github.com/apache/flink/pull/4607

Re: Flink Kafka to BucketingSink to S3 - S3Exception

Posted by Ravi Bhushan Ratnakar <ra...@gmail.com>.
I have done little changes in BucketingSink and implemented as new
CustomBucketingSink to use in my project which works fine with s3 and s3a
protocol.  This implementation doesn't require xml file configuration,
rather than it uses configuration provided using flink configuration object
by calling setConfig method of BucketingSink.

On Sat 3 Nov, 2018, 09:24 Flink Developer <developer143@protonmail.com
wrote:

> It seems the issue also appears when using
> *Flink version 1.6.2 . *
> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
> On Tuesday, October 30, 2018 10:26 PM, Flink Developer <
> developer143@protonmail.com> wrote:
>
> Hi, thanks for the info Rafi, that seems to be related.  I hope *Flink
> version 1.6.2* fixes this. Has anyone encountered this before?
>
> I would also like to note that my jar includes a *core-site.xml* file
> that uses **s3a**. Is this the recommended configuration to use with
> BucketingSink?   Should the sink be specified using
> *s3a://<bucket>/<prefix>* or  *s3://<bucket>/<prefix> *?
>
>    - <configuration>
>    -     <property>
>    -         <name>fs.s3.impl</name>
>    -         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>    -     </property>
>    -     <property>
>    -         <name>fs.s3a.buffer.dir</name>
>    -         <value>/tmp</value>
>    -     </property>
>    -     <property>
>    -         <name>fs.s3a.access.key</name>
>    -         <value>xxxxx</value>
>    -     </property>
>    -     <property>
>    -         <name>fs.s3a.secret.key</name>
>    -         <value>xxxxx</value>
>    -     </property>
>    -     <property>
>    -         <name>fs.s3a.buffer.dir</name>
>    -         <value>/tmp</value>
>    -     </property>
>    - </configuration>
>
>
> And my pom.xml uses:
>
>    - <artifactId>flink-s3-fs-hadoop</artifactId>
>    - ...
>    - <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
>    - ...
>    - <artifactId>hadoop-hdfs</artifactId>
>    - ...
>    - <artifactId>hadoop-common</artifactId>
>    - ...
>    - <artifactId>hadoop-core</artifactId>
>    - ...
>    - <artifactId>hadoop-aws</artifactId>
>    - ...
>
>
> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch <ra...@gmail.com>
> wrote:
>
> Hi,
>
> I'm also experiencing this with Flink 1.5.2. This is probably related to
> BucketingSink not working properly with S3 as filesystem because of the
> eventual-consistency of S3.
>
> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part
> of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and
> not presto).
>
> Does anyone know if this fix would solve this issue?
>
> Thanks,
> Rafi
>
>
> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <
> developer143@protonmail.com> wrote:
>
>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop
>> 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic
>> and sinks to S3 in the format of:
>> s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files
>> writing simultaneously.
>>
>> *Configuration:*
>> - Flink v1.5.2
>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11,
>> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause
>> between checkpoints in 2 mins. Timeout is set to 2 mins.
>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>> - Batch file size is set to 5mb.
>> - Batch rollover interval is set to 30min
>> - Writer uses GZip compression
>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1,
>> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>
>> The app is able to run for hours straight, but occasionally (once or
>> twice a day), it displays the following exception. When this happens, the
>> app is able to recover from previous checkpoint, but I am concerned about
>> the exception:
>>
>> *Caused by: java.io.IOException:
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
>> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx*
>>
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)*
>>    - *at
>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)*
>>
>> *Caused by:
>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
>> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx*
>>
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)*
>>    - *at
>>    com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)*
>>
>>
>> *And sometimes, it will show this:*
>>
>>    - *java.lang.RuntimeException: Error while restoring BucketingSink
>>    state.*
>>    - *at
>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)*
>>    - *at
>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)*
>>    - *at
>>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)*
>>    - *at
>>    org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)*
>>    - *at
>>    org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)*
>>    - *at
>>    org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)*
>>    - *at
>>    org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)*
>>    - *at
>>    org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)*
>>    - *at
>>    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)*
>>    - *at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)*
>>
>>
>> What causes this and how can it be resolved? Thank you.
>>
>> There seems to be a related Flink ticket and PR here, but I'm not sure if
>> this is the exact same issue and if it has been resolved:
>> https://issues.apache.org/jira/browse/FLINK-6306
>> https://github.com/apache/flink/pull/3752
>> https://github.com/apache/flink/pull/4607
>>
>
>
>

Re: Flink Kafka to BucketingSink to S3 - S3Exception

Posted by Flink Developer <de...@protonmail.com>.
It seems the issue also appears when using Flink version 1.6.2 .
‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On Tuesday, October 30, 2018 10:26 PM, Flink Developer <de...@protonmail.com> wrote:

> Hi, thanks for the info Rafi, that seems to be related.  I hope Flink version 1.6.2 fixes this. Has anyone encountered this before?
>
> I would also like to note that my jar includes a core-site.xml file that uses *s3a*. Is this the recommended configuration to use with BucketingSink?   Should the sink be specified using s3a://<bucket>/<prefix> or  s3://<bucket>/<prefix> ?
>
> - <configuration>
> -     <property>
> -         <name>fs.s3.impl</name>
> -         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
> -     </property>
> -     <property>
> -         <name>fs.s3a.buffer.dir</name>
> -         <value>/tmp</value>
> -     </property>
> -     <property>
> -         <name>fs.s3a.access.key</name>
> -         <value>xxxxx</value>
> -     </property>
> -     <property>
> -         <name>fs.s3a.secret.key</name>
> -         <value>xxxxx</value>
> -     </property>
> -     <property>
> -         <name>fs.s3a.buffer.dir</name>
> -         <value>/tmp</value>
> -     </property>
> - </configuration>
>
> And my pom.xml uses:
>
> - <artifactId>flink-s3-fs-hadoop</artifactId>
> - ...
> - <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
> - ...
> - <artifactId>hadoop-hdfs</artifactId>
> - ...
> - <artifactId>hadoop-common</artifactId>
> - ...
> - <artifactId>hadoop-core</artifactId>
> - ...
> - <artifactId>hadoop-aws</artifactId>
> - ...
>
> ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
> On Sunday, October 28, 2018 8:08 AM, Rafi Aroch <ra...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm also experiencing this with Flink 1.5.2. This is probably related to BucketingSink not working properly with S3 as filesystem because of the eventual-consistency of S3.
>>
>> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not presto).
>>
>> Does anyone know if this fix would solve this issue?
>>
>> Thanks,
>> Rafi
>>
>> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <de...@protonmail.com> wrote:
>>
>>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic and sinks to S3 in the format of: s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files writing simultaneously.
>>>
>>> Configuration:
>>> - Flink v1.5.2
>>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause between checkpoints in 2 mins. Timeout is set to 2 mins.
>>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>>> - Batch file size is set to 5mb.
>>> - Batch rollover interval is set to 30min
>>> - Writer uses GZip compression
>>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>>
>>> The app is able to run for hours straight, but occasionally (once or twice a day), it displays the following exception. When this happens, the app is able to recover from previous checkpoint, but I am concerned about the exception:
>>>
>>> Caused by: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)
>>>
>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>>>
>>> Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)
>>>
>>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)
>>>
>>> And sometimes, it will show this:
>>>
>>> - java.lang.RuntimeException: Error while restoring BucketingSink state.
>>>
>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)
>>>
>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)
>>>
>>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)
>>>
>>> - at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>>
>>> - at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>>
>>> - at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>>
>>> - at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>>>
>>> - at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>>>
>>> - at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>>>
>>> - at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>>
>>> What causes this and how can it be resolved? Thank you.
>>>
>>> There seems to be a related Flink ticket and PR here, but I'm not sure if this is the exact same issue and if it has been resolved:
>>> https://issues.apache.org/jira/browse/FLINK-6306
>>> https://github.com/apache/flink/pull/3752
>>> https://github.com/apache/flink/pull/4607

Re: Flink Kafka to BucketingSink to S3 - S3Exception

Posted by Flink Developer <de...@protonmail.com>.
Hi, thanks for the info Rafi, that seems to be related.  I hope Flink version 1.6.2 fixes this. Has anyone encountered this before?

I would also like to note that my jar includes a core-site.xml file that uses *s3a*. Is this the recommended configuration to use with BucketingSink?   Should the sink be specified using s3a://<bucket>/<prefix> or  s3://<bucket>/<prefix> ?

- <configuration>
-     <property>
-         <name>fs.s3.impl</name>
-         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
-     </property>
-     <property>
-         <name>fs.s3a.buffer.dir</name>
-         <value>/tmp</value>
-     </property>
-     <property>
-         <name>fs.s3a.access.key</name>
-         <value>xxxxx</value>
-     </property>
-     <property>
-         <name>fs.s3a.secret.key</name>
-         <value>xxxxx</value>
-     </property>
-     <property>
-         <name>fs.s3a.buffer.dir</name>
-         <value>/tmp</value>
-     </property>
- </configuration>

And my pom.xml uses:

- <artifactId>flink-s3-fs-hadoop</artifactId>
- ...
- <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
- ...
- <artifactId>hadoop-hdfs</artifactId>
- ...
- <artifactId>hadoop-common</artifactId>
- ...
- <artifactId>hadoop-core</artifactId>
- ...
- <artifactId>hadoop-aws</artifactId>
- ...

‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On Sunday, October 28, 2018 8:08 AM, Rafi Aroch <ra...@gmail.com> wrote:

> Hi,
>
> I'm also experiencing this with Flink 1.5.2. This is probably related to BucketingSink not working properly with S3 as filesystem because of the eventual-consistency of S3.
>
> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part of 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not presto).
>
> Does anyone know if this fix would solve this issue?
>
> Thanks,
> Rafi
>
> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <de...@protonmail.com> wrote:
>
>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic and sinks to S3 in the format of: s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files writing simultaneously.
>>
>> Configuration:
>> - Flink v1.5.2
>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause between checkpoints in 2 mins. Timeout is set to 2 mins.
>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
>> - Batch file size is set to 5mb.
>> - Batch rollover interval is set to 30min
>> - Writer uses GZip compression
>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, hadoop-core v1.2.1, hadoop-aws v3.1.1)
>>
>> The app is able to run for hours straight, but occasionally (once or twice a day), it displays the following exception. When this happens, the app is able to recover from previous checkpoint, but I am concerned about the exception:
>>
>> Caused by: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>
>> - at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)
>>
>> - at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)
>>
>> - at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)
>>
>> - at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)
>>
>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>>
>> Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)
>>
>> - at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)
>>
>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
>>
>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)
>>
>> - at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)
>>
>> And sometimes, it will show this:
>>
>> - java.lang.RuntimeException: Error while restoring BucketingSink state.
>>
>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)
>>
>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)
>>
>> - at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)
>>
>> - at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>
>> - at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>
>> - at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>
>> - at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>>
>> - at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>>
>> - at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>>
>> - at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>>
>> What causes this and how can it be resolved? Thank you.
>>
>> There seems to be a related Flink ticket and PR here, but I'm not sure if this is the exact same issue and if it has been resolved:
>> https://issues.apache.org/jira/browse/FLINK-6306
>> https://github.com/apache/flink/pull/3752
>> https://github.com/apache/flink/pull/4607

Re: Flink Kafka to BucketingSink to S3 - S3Exception

Posted by Rafi Aroch <ra...@gmail.com>.
Hi,

I'm also experiencing this with Flink 1.5.2. This is probably related to
BucketingSink not working properly with S3 as filesystem because of the
eventual-consistency of S3.

I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part of
1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not
presto).

Does anyone know if this fix would solve this issue?

Thanks,
Rafi


On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <
developer143@protonmail.com> wrote:

> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop
> 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic
> and sinks to S3 in the format of:
> s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files
> writing simultaneously.
>
> *Configuration:*
> - Flink v1.5.2
> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11,
> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause
> between checkpoints in 2 mins. Timeout is set to 2 mins.
> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
> - Batch file size is set to 5mb.
> - Batch rollover interval is set to 30min
> - Writer uses GZip compression
> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1,
> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>
> The app is able to run for hours straight, but occasionally (once or twice
> a day), it displays the following exception. When this happens, the app is
> able to recover from previous checkpoint, but I am concerned about the
> exception:
>
> *Caused by: java.io.IOException:
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx*
>
>    - *at
>    com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)*
> -
>
> *at
>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>    *
>
> *Caused by:
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx*
>
>    - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)*
>
>
> *And sometimes, it will show this:*
>
>    - *java.lang.RuntimeException: Error while restoring BucketingSink
>    state.*
> - *at
>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)*
> - *at
>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)*
> - *at
>    org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)*
> - *at
>    org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)*
> - *at
>    org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)*
> - *at
>    org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)*
> - *at
>    org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)*
> - *at
>    org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)*
> - *at
>    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)*
> - *at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)*
>
>
> What causes this and how can it be resolved? Thank you.
>
> There seems to be a related Flink ticket and PR here, but I'm not sure if
> this is the exact same issue and if it has been resolved:
> https://issues.apache.org/jira/browse/FLINK-6306
> https://github.com/apache/flink/pull/3752
> https://github.com/apache/flink/pull/4607
>