You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Arti Pande <pa...@gmail.com> on 2020/09/09 12:52:15 UTC

Re: Watermark generation issues with File sources in Flink 1.11.1

Hi Aljoscha,

By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till
1.11.1 when using File source the source operator (guessing split
enumerator or metadata reader) finishes immediately after starting (and
assigning the splits to split readers) hence when first checkpoint is
triggered, it sees the state of the first operator i.e. source as finished
and hence does not do any checkpointing. Thats' what you can see in logs
and also on the Flink UI for checkpoints. It assumes that the pipeline is
about to finish shortly and aborts the checkpoint.

This along with the watermark generation problems kind of make it difficult
to use file source in production.


On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Arti,
>
> what exactly do you mean by "checkpoints do not work"? Are there
> exceptions being thrown? How are you writing your file-based sources,
> what API methods are you using?
>
> Best,
> Aljoscha
>
> On 20.08.20 16:21, Arti Pande wrote:
> > Hi Till,
> >
> > Thank you for your quick response. Both the
> AssignerWithPeriodicWatermarks
> > and WatermarkStrategy I am using are very simple ones.
> >
> > *Code for AssignerWithPeriodicWatermarks:*
> >
> > public class CustomEventTimeWatermarkGenerator implements
> > AssignerWithPeriodicWatermarks<MyPojo> {
> >
> >      private final long maxOutOfOrderness = 0;
> >      private long currentMaxTimestamp;
> >
> >      @Override
> >      public long extractTimestamp(MyPojo myPojo, long previousTimestamp)
> {
> >          long timestamp = myPojo.getInitiationTime().toEpochMilli();
> >          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
> >          return timestamp;
> >      }
> >
> >      @Override
> >      public Watermark getCurrentWatermark() {
> >          return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
> >      }
> > }
> >
> >
> > *Code for WatermarkStrategy :*
> >
> > WatermarkStrategy<MyPojo> watermarkStrategy =
> >
> WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
> >                  .withTimestampAssigner((event, timestamp) ->
> > event.getInitiationTime().toEpochMilli());
> >
> >
> > Thanks & regards,
> > Arti
> >
> >
> > On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <tr...@apache.org>
> wrote:
> >
> >> Hi Arti,
> >>
> >> thanks for sharing this feedback with us. The WatermarkStrategy has been
> >> introduced quite recently and might have some rough edges. I am pulling
> in
> >> Aljoscha and Klou who have worked on this feature and might be able to
> help
> >> you. For better understanding your problem, it would be great if you
> could
> >> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.
> >>
> >> For the file source, the Flink community has recently introduced a new
> >> source abstraction which will also support checkpoints for file sources
> >> once the file source connector has been migrated to the new interfaces.
> The
> >> community is currently working on it.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <pa...@gmail.com>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
> >>> the watermark generation has issues with file source alone. It works
> well
> >>> with Kafka source.
> >>>
> >>> With 1.9.2 a custom watermark generator implementation of
> >>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
> >>> deprecated and to be replaced with WatermarkStrategy (that combines
> both
> >>> WatermarkGenerator and TimestampAssigner).
> >>>
> >>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
> >>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
> >>> perfectly well but with file source none of them works. The watermark
> >>> assigner never increments the watermarks resulting in stateful
> operators
> >>> not clearing their state ever, leading to erroneous results and
> >>> continuously increasing memory usage.
> >>>
> >>> Same code works well with Kafka source. Is this a known issue? If so,
> any
> >>> fix planned shortly?
> >>>
> >>> A side note (and probably a candidate for separate email, but I will
> >>> write it here) even checkpoints do not work with File Source since
> 1.9.2
> >>> and it is still the problem with 1.11.1. Just wondering if File source
> with
> >>> stream API is not a priority in Flink development? If so we can
> rethink our
> >>> sources.
> >>>
> >>> Thanks & regards,
> >>> Arti
> >>>
> >>
> >
>
>

Re: Watermark generation issues with File sources in Flink 1.11.1

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks David! This saved me quite some time.

Aljoscha

On 09.09.20 19:58, David Anderson wrote:
> Arti,
> 
> The problem with watermarks and the File source operator will be fixed in
> 1.11.2 [1]. This bug was introduced in 1.10.0, and isn't related to the new
> WatermarkStrategy api.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-19109
> 
> David
> 
> On Wed, Sep 9, 2020 at 2:52 PM Arti Pande <pa...@gmail.com> wrote:
> 
>> Hi Aljoscha,
>>
>> By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till
>> 1.11.1 when using File source the source operator (guessing split
>> enumerator or metadata reader) finishes immediately after starting (and
>> assigning the splits to split readers) hence when first checkpoint is
>> triggered, it sees the state of the first operator i.e. source as finished
>> and hence does not do any checkpointing. Thats' what you can see in logs
>> and also on the Flink UI for checkpoints. It assumes that the pipeline is
>> about to finish shortly and aborts the checkpoint.
>>
>> This along with the watermark generation problems kind of make it
>> difficult to use file source in production.
>>
>>
>> On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi Arti,
>>>
>>> what exactly do you mean by "checkpoints do not work"? Are there
>>> exceptions being thrown? How are you writing your file-based sources,
>>> what API methods are you using?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 20.08.20 16:21, Arti Pande wrote:
>>>> Hi Till,
>>>>
>>>> Thank you for your quick response. Both the
>>> AssignerWithPeriodicWatermarks
>>>> and WatermarkStrategy I am using are very simple ones.
>>>>
>>>> *Code for AssignerWithPeriodicWatermarks:*
>>>>
>>>> public class CustomEventTimeWatermarkGenerator implements
>>>> AssignerWithPeriodicWatermarks<MyPojo> {
>>>>
>>>>       private final long maxOutOfOrderness = 0;
>>>>       private long currentMaxTimestamp;
>>>>
>>>>       @Override
>>>>       public long extractTimestamp(MyPojo myPojo, long
>>> previousTimestamp) {
>>>>           long timestamp = myPojo.getInitiationTime().toEpochMilli();
>>>>           currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>>>>           return timestamp;
>>>>       }
>>>>
>>>>       @Override
>>>>       public Watermark getCurrentWatermark() {
>>>>           return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>>>>       }
>>>> }
>>>>
>>>>
>>>> *Code for WatermarkStrategy :*
>>>>
>>>> WatermarkStrategy<MyPojo> watermarkStrategy =
>>>>
>>> WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
>>>>                   .withTimestampAssigner((event, timestamp) ->
>>>> event.getInitiationTime().toEpochMilli());
>>>>
>>>>
>>>> Thanks & regards,
>>>> Arti
>>>>
>>>>
>>>> On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>>
>>>>> Hi Arti,
>>>>>
>>>>> thanks for sharing this feedback with us. The WatermarkStrategy has
>>> been
>>>>> introduced quite recently and might have some rough edges. I am
>>> pulling in
>>>>> Aljoscha and Klou who have worked on this feature and might be able to
>>> help
>>>>> you. For better understanding your problem, it would be great if you
>>> could
>>>>> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with
>>> us.
>>>>>
>>>>> For the file source, the Flink community has recently introduced a new
>>>>> source abstraction which will also support checkpoints for file sources
>>>>> once the file source connector has been migrated to the new
>>> interfaces. The
>>>>> community is currently working on it.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <pa...@gmail.com>
>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
>>>>>> the watermark generation has issues with file source alone. It works
>>> well
>>>>>> with Kafka source.
>>>>>>
>>>>>> With 1.9.2 a custom watermark generator implementation of
>>>>>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
>>>>>> deprecated and to be replaced with WatermarkStrategy (that combines
>>> both
>>>>>> WatermarkGenerator and TimestampAssigner).
>>>>>>
>>>>>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
>>>>>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
>>>>>> perfectly well but with file source none of them works. The watermark
>>>>>> assigner never increments the watermarks resulting in stateful
>>> operators
>>>>>> not clearing their state ever, leading to erroneous results and
>>>>>> continuously increasing memory usage.
>>>>>>
>>>>>> Same code works well with Kafka source. Is this a known issue? If so,
>>> any
>>>>>> fix planned shortly?
>>>>>>
>>>>>> A side note (and probably a candidate for separate email, but I will
>>>>>> write it here) even checkpoints do not work with File Source since
>>> 1.9.2
>>>>>> and it is still the problem with 1.11.1. Just wondering if File
>>> source with
>>>>>> stream API is not a priority in Flink development? If so we can
>>> rethink our
>>>>>> sources.
>>>>>>
>>>>>> Thanks & regards,
>>>>>> Arti
>>>>>>
>>>>>
>>>>
>>>
>>>
> 


Re: Watermark generation issues with File sources in Flink 1.11.1

Posted by David Anderson <da...@alpinegizmo.com>.
Arti,

The problem with watermarks and the File source operator will be fixed in
1.11.2 [1]. This bug was introduced in 1.10.0, and isn't related to the new
WatermarkStrategy api.

[1] https://issues.apache.org/jira/browse/FLINK-19109

David

On Wed, Sep 9, 2020 at 2:52 PM Arti Pande <pa...@gmail.com> wrote:

> Hi Aljoscha,
>
> By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till
> 1.11.1 when using File source the source operator (guessing split
> enumerator or metadata reader) finishes immediately after starting (and
> assigning the splits to split readers) hence when first checkpoint is
> triggered, it sees the state of the first operator i.e. source as finished
> and hence does not do any checkpointing. Thats' what you can see in logs
> and also on the Flink UI for checkpoints. It assumes that the pipeline is
> about to finish shortly and aborts the checkpoint.
>
> This along with the watermark generation problems kind of make it
> difficult to use file source in production.
>
>
> On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi Arti,
>>
>> what exactly do you mean by "checkpoints do not work"? Are there
>> exceptions being thrown? How are you writing your file-based sources,
>> what API methods are you using?
>>
>> Best,
>> Aljoscha
>>
>> On 20.08.20 16:21, Arti Pande wrote:
>> > Hi Till,
>> >
>> > Thank you for your quick response. Both the
>> AssignerWithPeriodicWatermarks
>> > and WatermarkStrategy I am using are very simple ones.
>> >
>> > *Code for AssignerWithPeriodicWatermarks:*
>> >
>> > public class CustomEventTimeWatermarkGenerator implements
>> > AssignerWithPeriodicWatermarks<MyPojo> {
>> >
>> >      private final long maxOutOfOrderness = 0;
>> >      private long currentMaxTimestamp;
>> >
>> >      @Override
>> >      public long extractTimestamp(MyPojo myPojo, long
>> previousTimestamp) {
>> >          long timestamp = myPojo.getInitiationTime().toEpochMilli();
>> >          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>> >          return timestamp;
>> >      }
>> >
>> >      @Override
>> >      public Watermark getCurrentWatermark() {
>> >          return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>> >      }
>> > }
>> >
>> >
>> > *Code for WatermarkStrategy :*
>> >
>> > WatermarkStrategy<MyPojo> watermarkStrategy =
>> >
>> WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
>> >                  .withTimestampAssigner((event, timestamp) ->
>> > event.getInitiationTime().toEpochMilli());
>> >
>> >
>> > Thanks & regards,
>> > Arti
>> >
>> >
>> > On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <tr...@apache.org>
>> wrote:
>> >
>> >> Hi Arti,
>> >>
>> >> thanks for sharing this feedback with us. The WatermarkStrategy has
>> been
>> >> introduced quite recently and might have some rough edges. I am
>> pulling in
>> >> Aljoscha and Klou who have worked on this feature and might be able to
>> help
>> >> you. For better understanding your problem, it would be great if you
>> could
>> >> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with
>> us.
>> >>
>> >> For the file source, the Flink community has recently introduced a new
>> >> source abstraction which will also support checkpoints for file sources
>> >> once the file source connector has been migrated to the new
>> interfaces. The
>> >> community is currently working on it.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <pa...@gmail.com>
>> wrote:
>> >>
>> >>> Hi,
>> >>>
>> >>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
>> >>> the watermark generation has issues with file source alone. It works
>> well
>> >>> with Kafka source.
>> >>>
>> >>> With 1.9.2 a custom watermark generator implementation of
>> >>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
>> >>> deprecated and to be replaced with WatermarkStrategy (that combines
>> both
>> >>> WatermarkGenerator and TimestampAssigner).
>> >>>
>> >>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
>> >>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
>> >>> perfectly well but with file source none of them works. The watermark
>> >>> assigner never increments the watermarks resulting in stateful
>> operators
>> >>> not clearing their state ever, leading to erroneous results and
>> >>> continuously increasing memory usage.
>> >>>
>> >>> Same code works well with Kafka source. Is this a known issue? If so,
>> any
>> >>> fix planned shortly?
>> >>>
>> >>> A side note (and probably a candidate for separate email, but I will
>> >>> write it here) even checkpoints do not work with File Source since
>> 1.9.2
>> >>> and it is still the problem with 1.11.1. Just wondering if File
>> source with
>> >>> stream API is not a priority in Flink development? If so we can
>> rethink our
>> >>> sources.
>> >>>
>> >>> Thanks & regards,
>> >>> Arti
>> >>>
>> >>
>> >
>>
>>