You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Simone Cavallarin <ca...@hotmail.com> on 2020/12/02 15:20:31 UTC

Process windows not firing - > Can it be a Watermak issue?

Hi All,

My code is not firing the process windows. I'm giving a static gap of '2000' to test (DynamicSessionWindows() - > is returning a fix '2000' that If I'm not wrong should be 2 seconds.

 FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withIdleness(Duration.ofMinutes(1))
                        .withTimestampAssigner((Event, timestamp) -> {
                            return Event.get_Time();
                        });
        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));


        DataStream<Tuple2<Event, Long>> enriched = stream
                .keyBy((Event CorrID) -> CorrID.get_CorrID())
                .map(new StatefulSessionCalculator());

      DataStream<String> WinStream = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()))
                .process(new MyProcessWindowFunction());

The "enriched" is where I'm enriching the message with the millis to use this number for a gap based on a function that I have implemented which is providing a calculated gap every event. For test purposed I have inserted "2000" manually, to see if it was firing every 2 seconds, but it is never firing.

I was then wondering how can i run some checks under the hood? To understand if my
.withTimestampAssigner((Event, timestamp) -> {
                            return Event.get_Time();
                        });
have assigned correctly the watermark, I'm particularly concerned about this because if I debug the application i can't see any watermak assigned, but maybe  I'm seraching on the wrong place and this is not the reason of my issue?


[cid:9ec49b98-9a9c-437f-9b1d-ac000e4539ed]

Thanks!

Simone

Re: Process windows not firing - > Can it be a Watermak issue?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Simone,

Which version of Flink are you using? Have you enabled event time support
via
StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?
Are you sure that the topic you are consuming contains data? Maybe you can
share the whole job with example data with us so that we can take a look at
it as a whole.

The fact that assigner has no field to display simply means that it
contains no fields which is normal for lambda functions.

Cheers,
Till

On Wed, Dec 2, 2020 at 4:21 PM Simone Cavallarin <ca...@hotmail.com>
wrote:

> Hi All,
>
> My code is not firing the process windows. I'm giving a static gap of
> '2000' to test (*DynamicSessionWindows()* - > is returning a fix '2000'
> that If I'm not wrong should be 2 seconds.
>
> * FlinkKafkaConsumer<Event> kafkaData =*
> *                new FlinkKafkaConsumer("CorID_0", new
> EventDeserializationSchema(), p);*
> *        WatermarkStrategy<Event> wmStrategy =*
> *                WatermarkStrategy*
> *                        .<Event>forMonotonousTimestamps()*
> *                        .withIdleness(Duration.ofMinutes(1))*
> *                        .withTimestampAssigner((Event, timestamp) -> {*
> *                            return Event.get_Time();*
> *                        });*
> *        DataStream<Event> stream = env.addSource(*
> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>
>
> *        DataStream<Tuple2<Event, Long>>** enriched** = stream*
> *                .keyBy((Event CorrID) -> CorrID.get_CorrID())*
> *                .map(new StatefulSessionCalculator());      *
>
> *      DataStream<String> WinStream = enriched*
> *                .keyBy(new MyKeySelector())*
> *                .window(EventTimeSessionWindows.withDynamicGap(new *
> *DynamicSessionWindows()**))*
> *                .process(new MyProcessWindowFunction());*
>
> The "enriched" is where I'm enriching the message with the millis to use
> this number for a gap based on a function that I have implemented which is
> providing a calculated gap every event. For test purposed I have inserted
> "2000" manually, to see if it was firing every 2 seconds, but it is never
> firing.
>
> I was then wondering how can i run some checks under the hood? To
> understand if my
> *.withTimestampAssigner((Event, timestamp) -> {*
> *                            return Event.get_Time();*
> *                        });*
> have assigned correctly the watermark, I'm particularly concerned about
> this because if I debug the application i can't see any watermak assigned,
> but maybe  I'm seraching on the wrong place and this is not the reason of
> my issue?
>
>
>
>
> Thanks!
>
> Simone
>

Re: Process windows not firing - > Can it be a Watermak issue?

Posted by Simone Cavallarin <ca...@hotmail.com>.
Hi Till

Super, understood! I will also read the website with the link that you provided me.

Thanks  and have a nice eve.

best

s

________________________________
From: Till Rohrmann <tr...@apache.org>
Sent: 02 December 2020 17:44
To: Simone Cavallarin <ca...@hotmail.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Process windows not firing - > Can it be a Watermak issue?

Hi Simone,

You need to set this option because otherwise Flink will not generate the Watermarks. If you don't need watermarks (e.g. when using ProcessingTime), then the system needs to send fewer records over the wire. That's basically how Flink has been developed [1].

With Flink 1.12 this option no longer needs to be set because it is activated by default.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fevent_time.html&data=04%7C01%7C%7C4d4822a42cf84a4ac54908d896e9fddb%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637425279012441040%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=9J8R9AuOuBztrnh1zvkA46rAGWKw%2BgKSFSCgo%2FTPv4w%3D&reserved=0>

Cheers,
Till

On Wed, Dec 2, 2020 at 6:17 PM Simone Cavallarin <ca...@hotmail.com>> wrote:
Hi Till and David,

First at all thanks for the quick reply, really appreciated!

Drilling down to the issue:


  1.  No session is closing because there isn't a sufficiently long gap in the test data -> It was the first thing that I thought, before asking I run a test checking the gap on my data an using a super small interval of 10 millis. The problem was not this.
  2.  The test runs to completion before the periodic watermark generator has generated any watermarks. -> The data is flowing for hours and i can see everything debugging  the flow, every event was with all information, I'm mapping my stream and use the stream it self to compute the gap, so I play a while with it.
  3.  Which version of Flink are you using? - > 1.11.2
  4.  Have you enabled event time support via StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)? - > BINGO! this was the issue, as soon as I added this line on the code it started to work!! 🙂 (It would be nice to understand what this magic line is doing!)
  5.  The fact that assigner has no field to display simply means that it contains no fields which is normal for lambda functions-> Thanks I didn't know that. noted!

Why StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) is so important?

Thanks

s



________________________________
From: David Anderson <da...@alpinegizmo.com>>
Sent: 02 December 2020 16:01
To: Simone Cavallarin <ca...@hotmail.com>>
Cc: user <us...@flink.apache.org>>
Subject: Re: Process windows not firing - > Can it be a Watermak issue?

In cases like this one, one of the following tends to be the explanation:

- No session is closing because there isn't a sufficiently long gap in the test data.
- The test runs to completion before the periodic watermark generator has generated any watermarks.

David

From: Till Rohrmann <tr...@apache.org>>
Sent: 02 December 2020 15:46
To: Simone Cavallarin <ca...@hotmail.com>>
Cc: user <us...@flink.apache.org>>
Subject: Re: Process windows not firing - > Can it be a Watermak issue?

Hi Simone,

Which version of Flink are you using? Have you enabled event time support via StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)? Are you sure that the topic you are consuming contains data? Maybe you can share the whole job with example data with us so that we can take a look at it as a whole.

The fact that assigner has no field to display simply means that it contains no fields which is normal for lambda functions.

Cheers,
Till



Re: Process windows not firing - > Can it be a Watermak issue?

Posted by Till Rohrmann <tr...@apache.org>.
Hi Simone,

You need to set this option because otherwise Flink will not generate the
Watermarks. If you don't need watermarks (e.g. when using ProcessingTime),
then the system needs to send fewer records over the wire. That's basically
how Flink has been developed [1].

With Flink 1.12 this option no longer needs to be set because it is
activated by default.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html

Cheers,
Till

On Wed, Dec 2, 2020 at 6:17 PM Simone Cavallarin <ca...@hotmail.com>
wrote:

> Hi Till and David,
>
> First at all thanks for the quick reply, really appreciated!
>
> Drilling down to the issue:
>
>
>    1. *No session is closing because there isn't a sufficiently long gap
>    in the test data *-> It was the first thing that I thought, before
>    asking I run a test checking the gap on my data an using a super small
>    interval of 10 millis. The problem was not this.
>    2. *The test runs to completion before the periodic watermark
>    generator has generated any watermarks.* -> The data is flowing for
>    hours and i can see everything debugging  the flow, every event was with
>    all information, I'm mapping my stream and use the stream it self to
>    compute the gap, so I play a while with it.
>    3. *Which version of Flink are you using?* - > 1.11.2
>    4. *Have you enabled event time support via
>    StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?
>    -* > BINGO! this was the issue, as soon as I added this line on the
>    code it started to work!! 🙂 (It would be nice to understand what this
>    magic line is doing!)
>    5. *The fact that assigner has no field to display simply means that
>    it contains no fields which is normal for lambda functions-*> Thanks I
>    didn't know that. noted!
>
>
> Why
> StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> is so important?
>
> Thanks
>
> s
>
>
>
> ------------------------------
> *From:* David Anderson <da...@alpinegizmo.com>
> *Sent:* 02 December 2020 16:01
> *To:* Simone Cavallarin <ca...@hotmail.com>
> *Cc:* user <us...@flink.apache.org>
> *Subject:* Re: Process windows not firing - > Can it be a Watermak issue?
>
> In cases like this one, one of the following tends to be the explanation:
>
> - No session is closing because there isn't a sufficiently long gap in the
> test data.
> - The test runs to completion before the periodic watermark generator has
> generated any watermarks.
>
> David
>
> *From:* Till Rohrmann <tr...@apache.org>
> *Sent:* 02 December 2020 15:46
> *To:* Simone Cavallarin <ca...@hotmail.com>
> *Cc:* user <us...@flink.apache.org>
> *Subject:* Re: Process windows not firing - > Can it be a Watermak issue?
>
> Hi Simone,
>
> Which version of Flink are you using? Have you enabled event time support
> via
> StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?
> Are you sure that the topic you are consuming contains data? Maybe you can
> share the whole job with example data with us so that we can take a look at
> it as a whole.
>
> The fact that assigner has no field to display simply means that it
> contains no fields which is normal for lambda functions.
>
> Cheers,
> Till
>
>
>

Re: Process windows not firing - > Can it be a Watermak issue?

Posted by Simone Cavallarin <ca...@hotmail.com>.
Hi Till and David,

First at all thanks for the quick reply, really appreciated!

Drilling down to the issue:


  1.  No session is closing because there isn't a sufficiently long gap in the test data -> It was the first thing that I thought, before asking I run a test checking the gap on my data an using a super small interval of 10 millis. The problem was not this.
  2.  The test runs to completion before the periodic watermark generator has generated any watermarks. -> The data is flowing for hours and i can see everything debugging  the flow, every event was with all information, I'm mapping my stream and use the stream it self to compute the gap, so I play a while with it.
  3.  Which version of Flink are you using? - > 1.11.2
  4.  Have you enabled event time support via StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)? - > BINGO! this was the issue, as soon as I added this line on the code it started to work!! 🙂 (It would be nice to understand what this magic line is doing!)
  5.  The fact that assigner has no field to display simply means that it contains no fields which is normal for lambda functions-> Thanks I didn't know that. noted!

Why StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) is so important?

Thanks

s



________________________________
From: David Anderson <da...@alpinegizmo.com>
Sent: 02 December 2020 16:01
To: Simone Cavallarin <ca...@hotmail.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Process windows not firing - > Can it be a Watermak issue?

In cases like this one, one of the following tends to be the explanation:

- No session is closing because there isn't a sufficiently long gap in the test data.
- The test runs to completion before the periodic watermark generator has generated any watermarks.

David

From: Till Rohrmann <tr...@apache.org>
Sent: 02 December 2020 15:46
To: Simone Cavallarin <ca...@hotmail.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Process windows not firing - > Can it be a Watermak issue?

Hi Simone,

Which version of Flink are you using? Have you enabled event time support via StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)? Are you sure that the topic you are consuming contains data? Maybe you can share the whole job with example data with us so that we can take a look at it as a whole.

The fact that assigner has no field to display simply means that it contains no fields which is normal for lambda functions.

Cheers,
Till



Re: Process windows not firing - > Can it be a Watermak issue?

Posted by David Anderson <da...@alpinegizmo.com>.
In cases like this one, one of the following tends to be the explanation:

- No session is closing because there isn't a sufficiently long gap in the
test data.
- The test runs to completion before the periodic watermark generator has
generated any watermarks.

David

On Wed, Dec 2, 2020 at 4:20 PM Simone Cavallarin <ca...@hotmail.com>
wrote:

> Hi All,
>
> My code is not firing the process windows. I'm giving a static gap of
> '2000' to test (*DynamicSessionWindows()* - > is returning a fix '2000'
> that If I'm not wrong should be 2 seconds.
>
> * FlinkKafkaConsumer<Event> kafkaData =*
> *                new FlinkKafkaConsumer("CorID_0", new
> EventDeserializationSchema(), p);*
> *        WatermarkStrategy<Event> wmStrategy =*
> *                WatermarkStrategy*
> *                        .<Event>forMonotonousTimestamps()*
> *                        .withIdleness(Duration.ofMinutes(1))*
> *                        .withTimestampAssigner((Event, timestamp) -> {*
> *                            return Event.get_Time();*
> *                        });*
> *        DataStream<Event> stream = env.addSource(*
> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>
>
> *        DataStream<Tuple2<Event, Long>>** enriched** = stream*
> *                .keyBy((Event CorrID) -> CorrID.get_CorrID())*
> *                .map(new StatefulSessionCalculator());      *
>
> *      DataStream<String> WinStream = enriched*
> *                .keyBy(new MyKeySelector())*
> *                .window(EventTimeSessionWindows.withDynamicGap(new *
> *DynamicSessionWindows()**))*
> *                .process(new MyProcessWindowFunction());*
>
> The "enriched" is where I'm enriching the message with the millis to use
> this number for a gap based on a function that I have implemented which is
> providing a calculated gap every event. For test purposed I have inserted
> "2000" manually, to see if it was firing every 2 seconds, but it is never
> firing.
>
> I was then wondering how can i run some checks under the hood? To
> understand if my
> *.withTimestampAssigner((Event, timestamp) -> {*
> *                            return Event.get_Time();*
> *                        });*
> have assigned correctly the watermark, I'm particularly concerned about
> this because if I debug the application i can't see any watermak assigned,
> but maybe  I'm seraching on the wrong place and this is not the reason of
> my issue?
>
>
>
>
> Thanks!
>
> Simone
>