You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Pawel Bartoszek <pa...@gmail.com> on 2020/01/23 23:29:02 UTC

FileStreamingSink is using the same counter for different files

Hi,


Flink Streaming Sink is designed to use global counter when creating files
to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics
(managed flink provided by AWS) with bulk writes (rolling policy is
hardcoded to roll over on checkpoint).
My job is configured to checkpoint every minute. Job is running with
parallelism 1.

The problem is that the same counter 616 is used for both
files invalid-records/2020-01-22T15_06_00Z/part-0-616
and invalid-records/2020-01-22T15_05_00Z/part-0-616.

15:06:37
{ "locationInformation":
"org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
"logger": "org.apache.flink.fs.s3.common.writer.S3Committer",
"message": "Committing
invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID
f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--",
"threadName": "Async calls on Source: Custom Source -> Extract Td-agent
message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
"applicationARN":
"arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
"applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
"INFO"}
}
15:07:37
{ "locationInformation":
"org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
"logger": "org.apache.flink.fs.s3.common.writer.S3Committer",
"message": "Committing
invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID
XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--",
"threadName": "Async calls on Source: Custom Source -> Extract Td-agent
message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
"applicationARN":
"arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
"applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
"INFO" }

Thanks,
Pawel

Re: FileStreamingSink is using the same counter for different files

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Pawel,

You are correct that the write method invocation is guaranteed to be
thread safe for the same sub operator instance.
But I am not sure if having a unique counter per subtask across
buckets would add much to the user experience of the sink.
I think that in both cases, the interpretation of the part files would
be the same.

I may be wrong though so please let me know if this is a deal breaker for you.

Cheers,
Kostas


On Sat, Jan 25, 2020 at 11:48 AM Pawel Bartoszek
<pa...@gmail.com> wrote:
>
> Hi Kostas,
>
> Thanks for confirming that. I started thinking it might be useful or more user friendly to use unique counter across buckets for the same operator subtask?
> The way I could imagine this working is to pass max counter to the https://github.com/apache/flink/blob/e7e24471240dbaa6b5148d406575e57d170b1623/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L204 write method? or bucket holding instance of Buckets class and accessing global counter from there? As far as I know the write method invocation is guaranteed to be thread safe for the same sub operator instance.
>
> Thanks,
> Pawel
>
>
> On Fri, 24 Jan 2020 at 20:45, Kostas Kloudas <kk...@gmail.com> wrote:
>>
>> Hi Pawel,
>>
>> You are correct that counters are unique within the same bucket but
>> NOT across buckets. Across buckets, you may see the same counter being
>> used.
>> The max counter is used only upon restoring from a failure, resuming
>> from a savepoint or rescaling and this is done to guarantee that n
>> valid data are overwritten while limiting the state that Flink has to
>> keep internally. For a more detailed discussion about the why, you can
>> have a look here: https://issues.apache.org/jira/browse/FLINK-13609
>>
>> Cheers,
>> Kostas
>>
>> On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek
>> <pa...@gmail.com> wrote:
>> >
>> > I have looked into the source code and it looks likes that the same counter counter value being used in two buckets is correct.
>> > Each Bucket class https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java is passed partCounter in the constructor. Whenever part file is rolled over then counter is incremented within the scope of this bucket. It can happen that there are two or more active buckets and counter is increased independently inside them so that they are become equal. However, global max counter maintained by Buckets class always keeps the max part counter so that when new bucket is created is passed the correct part counter.
>> >
>> > I have done my analysis based on the logs from my job. I highlighted the same counter value used for part-0-8.
>> >
>> > 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z.
>> > 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 received completion notification for checkpoint with id=7.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7).
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx
>> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to element
>> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z.
>> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 received completion notification for checkpoint with id=8.
>> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to element
>> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z.
>> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing for checkpoint with id=9 (max part counter=9).
>> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint.
>> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx
>> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on checkpoint.
>> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and bucketPath=s3://xxx
>> > 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to element
>> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z.
>> > 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 received completion notification for checkpoint with id=9.
>> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to element
>> > 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z.
>> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing for checkpoint with id=10 (max part counter=10).
>> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint.
>> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx
>> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on checkpoint.
>> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and bucketPath=s3://xxx
>> >
>> >
>> > Thanks,
>> > Pawel
>> >
>> >
>> > On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <pa...@gmail.com> wrote:
>> >>
>> >> Hi,
>> >>
>> >>
>> >> Flink Streaming Sink is designed to use global counter when creating files to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics (managed flink provided by AWS) with bulk writes (rolling policy is hardcoded to roll over on checkpoint).
>> >> My job is configured to checkpoint every minute. Job is running with parallelism 1.
>> >>
>> >> The problem is that the same counter 616 is used for both files invalid-records/2020-01-22T15_06_00Z/part-0-616 and invalid-records/2020-01-22T15_05_00Z/part-0-616.
>> >>
>> >> 15:06:37
>> >> { "locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--", "threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", "applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": "INFO"}
>> >> }
>> >> 15:07:37
>> >> { "locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--", "threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", "applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": "INFO" }
>> >>
>> >> Thanks,
>> >> Pawel

Re: FileStreamingSink is using the same counter for different files

Posted by Pawel Bartoszek <pa...@gmail.com>.
Hi Kostas,

Thanks for confirming that. I started thinking it might be useful or more
user friendly to use unique counter across buckets for the same operator
subtask?
The way I could imagine this working is to pass max counter to the
https://github.com/apache/flink/blob/e7e24471240dbaa6b5148d406575e57d170b1623/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L204
write method? or bucket holding instance of Buckets class and accessing
global counter from there? As far as I know the write method invocation is
guaranteed to be thread safe for the same sub operator instance.

Thanks,
Pawel


On Fri, 24 Jan 2020 at 20:45, Kostas Kloudas <kk...@gmail.com> wrote:

> Hi Pawel,
>
> You are correct that counters are unique within the same bucket but
> NOT across buckets. Across buckets, you may see the same counter being
> used.
> The max counter is used only upon restoring from a failure, resuming
> from a savepoint or rescaling and this is done to guarantee that n
> valid data are overwritten while limiting the state that Flink has to
> keep internally. For a more detailed discussion about the why, you can
> have a look here: https://issues.apache.org/jira/browse/FLINK-13609
>
> Cheers,
> Kostas
>
> On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek
> <pa...@gmail.com> wrote:
> >
> > I have looked into the source code and it looks likes that the same
> counter counter value being used in two buckets is correct.
> > Each Bucket class
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
> is passed partCounter in the constructor. Whenever part file is rolled over
> then counter is incremented within the scope of this bucket. It can happen
> that there are two or more active buckets and counter is increased
> independently inside them so that they are become equal. However, global
> max counter maintained by Buckets class always keeps the max part counter
> so that when new bucket is created is passed the correct part counter.
> >
> > I have done my analysis based on the logs from my job. I highlighted the
> same counter value used for part-0-8.
> >
> > 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z.
> > 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 received completion notification for checkpoint with id=7.
> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing for checkpoint with id=8 (max part counter=7).
> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on
> checkpoint.
> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and
> bucketPath=s3://xxx
> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to
> element
> > 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z.
> > 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 received completion notification for checkpoint with id=8.
> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to
> element
> > 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z.
> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing for checkpoint with id=9 (max part counter=9).
> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on
> checkpoint.
> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and
> bucketPath=s3://xxx
> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on
> checkpoint.
> > 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and
> bucketPath=s3://xxx
> > 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to
> element
> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z.
> > 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 received completion notification for checkpoint with id=9.
> > 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to
> element
> > 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z.
> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing for checkpoint with id=10 (max part counter=10).
> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on
> checkpoint.
> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and
> bucketPath=s3://xxx
> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask
> 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on
> checkpoint.
> > 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask
> 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and
> bucketPath=s3://xxx
> >
> >
> > Thanks,
> > Pawel
> >
> >
> > On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <
> pawelbartoszek89@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >>
> >> Flink Streaming Sink is designed to use global counter when creating
> files to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics
> (managed flink provided by AWS) with bulk writes (rolling policy is
> hardcoded to roll over on checkpoint).
> >> My job is configured to checkpoint every minute. Job is running with
> parallelism 1.
> >>
> >> The problem is that the same counter 616 is used for both files
> invalid-records/2020-01-22T15_06_00Z/part-0-616 and
> invalid-records/2020-01-22T15_05_00Z/part-0-616.
> >>
> >> 15:06:37
> >> { "locationInformation":
> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
> "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message":
> "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID
> f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--",
> "threadName": "Async calls on Source: Custom Source -> Extract Td-agent
> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
> "applicationARN":
> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
> "INFO"}
> >> }
> >> 15:07:37
> >> { "locationInformation":
> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
> "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message":
> "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID
> XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--",
> "threadName": "Async calls on Source: Custom Source -> Extract Td-agent
> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
> "applicationARN":
> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
> "INFO" }
> >>
> >> Thanks,
> >> Pawel
>

Re: FileStreamingSink is using the same counter for different files

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Pawel,

You are correct that counters are unique within the same bucket but
NOT across buckets. Across buckets, you may see the same counter being
used.
The max counter is used only upon restoring from a failure, resuming
from a savepoint or rescaling and this is done to guarantee that n
valid data are overwritten while limiting the state that Flink has to
keep internally. For a more detailed discussion about the why, you can
have a look here: https://issues.apache.org/jira/browse/FLINK-13609

Cheers,
Kostas

On Fri, Jan 24, 2020 at 5:16 PM Pawel Bartoszek
<pa...@gmail.com> wrote:
>
> I have looked into the source code and it looks likes that the same counter counter value being used in two buckets is correct.
> Each Bucket class https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java is passed partCounter in the constructor. Whenever part file is rolled over then counter is incremented within the scope of this bucket. It can happen that there are two or more active buckets and counter is increased independently inside them so that they are become equal. However, global max counter maintained by Buckets class always keeps the max part counter so that when new bucket is created is passed the correct part counter.
>
> I have done my analysis based on the logs from my job. I highlighted the same counter value used for part-0-8.
>
> 2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 opening new part file "part-0-6" for bucket id=2020-01-24T14_54_00Z.
> 2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 received completion notification for checkpoint with id=7.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7).
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx
> 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to element
> 2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 opening new part file "part-0-7" for bucket id=2020-01-24T14_54_00Z.
> 2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 received completion notification for checkpoint with id=8.
> 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to element
> 2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_55_00Z.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing for checkpoint with id=9 (max part counter=9).
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on checkpoint.
> 2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and bucketPath=s3://xxx
> 2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z due to element
> 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 opening new part file "part-0-8" for bucket id=2020-01-24T14_54_00Z.
> 2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 received completion notification for checkpoint with id=9.
> 2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z due to element
> 2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 opening new part file "part-0-9" for bucket id=2020-01-24T14_55_00Z.
> 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO  org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing for checkpoint with id=10 (max part counter=10).
> 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_54_00Z on checkpoint.
> 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z and bucketPath=s3://xxx
> 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  - Subtask 0 closing in-progress part file for bucket id=2020-01-24T14_55_00Z on checkpoint.
> 2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z and bucketPath=s3://xxx
>
>
> Thanks,
> Pawel
>
>
> On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <pa...@gmail.com> wrote:
>>
>> Hi,
>>
>>
>> Flink Streaming Sink is designed to use global counter when creating files to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics (managed flink provided by AWS) with bulk writes (rolling policy is hardcoded to roll over on checkpoint).
>> My job is configured to checkpoint every minute. Job is running with parallelism 1.
>>
>> The problem is that the same counter 616 is used for both files invalid-records/2020-01-22T15_06_00Z/part-0-616 and invalid-records/2020-01-22T15_05_00Z/part-0-616.
>>
>> 15:06:37
>> { "locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--", "threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", "applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": "INFO"}
>> }
>> 15:07:37
>> { "locationInformation": "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)", "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--", "threadName": "Async calls on Source: Custom Source -> Extract Td-agent message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)", "applicationARN": "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel", "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType": "INFO" }
>>
>> Thanks,
>> Pawel

Re: FileStreamingSink is using the same counter for different files

Posted by Pawel Bartoszek <pa...@gmail.com>.
I have looked into the source code and it looks likes that the same counter
counter value being used in two buckets is correct.
Each Bucket class
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
is
passed partCounter in the constructor. Whenever part file is rolled over
then counter is incremented within the scope of this bucket. It can happen
that there are two or more active buckets and counter is increased
independently inside them so that they are become equal. However,
global max counter maintained by Bucket*s *class always keeps the max part
counter so that when new bucket is created is passed the correct part
counter.

I have done my analysis based on the logs from my job. I highlighted the
same counter value used for part-0-8.

2020-01-24 14:57:41 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-6" for bucket
id=2020-01-24T14_54_00Z.
2020-01-24 14:57:41 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 received completion notification for checkpoint with id=7.
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing for checkpoint with id=8 (max part counter=7).
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z on checkpoint.
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z
and bucketPath=s3://xxx
2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z due to element
2020-01-24 14:58:11 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-7" for bucket
id=2020-01-24T14_54_00Z.
2020-01-24 14:58:11 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 received completion notification for checkpoint with id=8.
2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z due to element
2020-01-24 14:58:23 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "*part-0-8*" for bucket
id=2020-01-24T14_55_00Z.
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing for checkpoint with id=9 (max part counter=9).
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z on checkpoint.
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z
and bucketPath=s3://xxx
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z on checkpoint.
2020-01-24 14:58:41 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z
and bucketPath=s3://xxx
2020-01-24 14:58:41 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z due to element
2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-8" for bucket
id=2020-01-24T14_54_00Z.
2020-01-24 14:58:42 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 received completion notification for checkpoint with id=9.
2020-01-24 14:58:42 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z due to element
2020-01-24 14:58:43 [Sink (1/1)-thread-0] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 opening new part file "part-0-9" for bucket
id=2020-01-24T14_55_00Z.
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing for checkpoint with id=10 (max part
counter=10).
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_54_00Z on checkpoint.
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_54_00Z
and bucketPath=s3://xxx
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket  -
Subtask 0 closing in-progress part file for bucket
id=2020-01-24T14_55_00Z on checkpoint.
2020-01-24 14:59:11 [Async Sink: Unnamed (1/1)] DEBUG
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets  -
Subtask 0 checkpointing: BucketState for bucketId=2020-01-24T14_55_00Z
and bucketPath=s3://xxx


Thanks,
Pawel


On Thu, 23 Jan 2020 at 23:29, Pawel Bartoszek <pa...@gmail.com>
wrote:

> Hi,
>
>
> Flink Streaming Sink is designed to use global counter when creating files
> to avoid overwrites. I am running Flink 1.8.2 with Kinesis Analytics
> (managed flink provided by AWS) with bulk writes (rolling policy is
> hardcoded to roll over on checkpoint).
> My job is configured to checkpoint every minute. Job is running with
> parallelism 1.
>
> The problem is that the same counter 616 is used for both
> files invalid-records/2020-01-22T15_06_00Z/part-0-616
> and invalid-records/2020-01-22T15_05_00Z/part-0-616.
>
> 15:06:37
> { "locationInformation":
> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
> "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing
> invalid-records/2020-01-22T15_06_00Z/part-0-616 with MPU ID
> f7PQc2D82.kKaDRS.RXYYS8AkLd5q_9ogw3WZJJg2KGABhYgjtv.eJbqQ_UwpzciYb.TDTIkixulkmaTMyyuwmr6c5eC61aenoo2m4cj7wAT9v0JXB3i6gBArw.HpSLxpUBTEW6PT3aN9XKPZmT2kg--",
> "threadName": "Async calls on Source: Custom Source -> Extract Td-agent
> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
> "applicationARN":
> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
> "INFO"}
> }
> 15:07:37
> { "locationInformation":
> "org.apache.flink.fs.s3.common.writer.S3Committer.commit(S3Committer.java:64)",
> "logger": "org.apache.flink.fs.s3.common.writer.S3Committer", "message": "Committing
> invalid-records/2020-01-22T15_05_00Z/part-0-616 with MPU ID
> XoliYkdvP1Cc3gePyteIGhTqF1LrID8rEyddaPXRNPQYkWDNKpDF0tnYuhDBqywAqCWf4nJTOJ2Kx_a_91KTyVTvZ7GkKs25nseGs4jDR6Y5Nxuai47aKNeWeS4bs9imMJ1iAxbd7lRQyxnM5qwDeA--",
> "threadName": "Async calls on Source: Custom Source -> Extract Td-agent
> message -> To DataRecord -> Find invalid records -> Sink: Unnamed (1/1)",
> "applicationARN":
> "arn:aws:kinesisanalytics:eu-west-1:679291754673:application/pawel",
> "applicationVersionId": "33", "messageSchemaVersion": "1", "messageType":
> "INFO" }
>
> Thanks,
> Pawel
>