You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Smith <ja...@gmail.com> on 2022/01/31 21:28:19 UTC

Tumbling window apply will not "fire"

Hi I have the following job... I'm expecting the System.out
.println(key.toString());   to at least print, but nothing prints.

- .flatMap: Fires prints my debug message once as expected.
- .keyBy: Also fires, but prints my debug message twice.
- .apply: Doesn't seem to fire. The debug statement doesn't seem to print.
I'm expecting it to print the key from above keyBy.

DataStream<MyEvent> slStream = env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks(), "Kafka Source")
        .uid(kafkaTopic).name(kafkaTopic)
        .setParallelism(1)
        .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
        .uid("map-json-logs").name("map-json-logs");
        slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <---
This prints twice
        .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
                .apply(new WindowFunction<MyEvent, MyEvent,
Tuple3<String, String, String>, TimeWindow>() {
            @Override
            public void apply(Tuple3<String, String, String> key,
TimeWindow window, Iterable<MyEvent> input, Collector<MyEvent> out)
throws Exception {
                // This should print.
                System.out.println(key.toString());

                // Do nothing for now
            }
        })
        .uid("process").name("process")
        ;

Re: Tumbling window apply will not "fire"

Posted by John Smith <ja...@gmail.com>.
ok it's working! Thanks. Just out of curiosity, why is the println of keyBy
printing twice?

On Mon, 31 Jan 2022 at 17:22, John Smith <ja...@gmail.com> wrote:

> Oh ok. I was reading here:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/streaming_analytics/#latency-vs-completeness
> and Idid a cut and paste lol
>
> Ok let you know.
>
> On Mon, 31 Jan 2022 at 17:18, Dario Heinisch <da...@gmail.com>
> wrote:
>
>> Then you should be using a process based time window, in your case:
>> TumblingProcessingTimeWindows
>>
>> See
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/
>> for more info
>> On 31.01.22 23:13, John Smith wrote:
>>
>> Hi Dario, I don't care about event time I just want to do tumbling window
>> over the "processing time" I.e: count whatever I have in the last 5 minutes.
>>
>> On Mon, 31 Jan 2022 at 17:09, Dario Heinisch <da...@gmail.com>
>> wrote:
>>
>>> Hi John
>>>
>>> This is because you are using event time (TumblingEventTimeWinodws) but
>>> you do not have a event time watermark strategy.
>>> It is also why I opened:
>>> https://issues.apache.org/jira/browse/FLINK-24623 because I feel like
>>> Flink should be throwing an exception in that case
>>> on startup.
>>>
>>> Take a look at the documentation at:
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
>>> which should have everything.
>>>
>>> > In order to work with event time, Flink needs to know the events
>>> timestamps, meaning each element in the stream needs to have its event
>>> timestamp assigned. This is usually done by accessing/extracting the
>>> timestamp from > some field in the element by using a TimestampAssigner.
>>> > Timestamp assignment goes hand-in-hand with generating watermarks,
>>> which tell the system about progress in event time. You can configure this
>>> by specifying a WatermarkGenerator.
>>>
>>> Best regards,
>>>
>>> Dario
>>> On 31.01.22 22:28, John Smith wrote:
>>>
>>> Hi I have the following job... I'm expecting the System.out
>>> .println(key.toString());   to at least print, but nothing prints.
>>>
>>> - .flatMap: Fires prints my debug message once as expected.
>>> - .keyBy: Also fires, but prints my debug message twice.
>>> - .apply: Doesn't seem to fire. The debug statement doesn't seem to
>>> print. I'm expecting it to print the key from above keyBy.
>>>
>>> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
>>>         .uid(kafkaTopic).name(kafkaTopic)
>>>         .setParallelism(1)
>>>         .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
>>>         .uid("map-json-logs").name("map-json-logs");        slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints twice
>>>         .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
>>>                 .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, String, String>, TimeWindow>() {
>>>             @Override            public void apply(Tuple3<String, String, String> key, TimeWindow window, Iterable<MyEvent> input, Collector<MyEvent> out) throws Exception {
>>>                 // This should print.                System.out.println(key.toString());                // Do nothing for now            }
>>>         })
>>>         .uid("process").name("process")
>>>         ;
>>>
>>>
>>>
>>>

Re: Tumbling window apply will not "fire"

Posted by John Smith <ja...@gmail.com>.
Oh ok. I was reading here:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/streaming_analytics/#latency-vs-completeness
and Idid a cut and paste lol

Ok let you know.

On Mon, 31 Jan 2022 at 17:18, Dario Heinisch <da...@gmail.com>
wrote:

> Then you should be using a process based time window, in your case:
> TumblingProcessingTimeWindows
>
> See
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/
> for more info
> On 31.01.22 23:13, John Smith wrote:
>
> Hi Dario, I don't care about event time I just want to do tumbling window
> over the "processing time" I.e: count whatever I have in the last 5 minutes.
>
> On Mon, 31 Jan 2022 at 17:09, Dario Heinisch <da...@gmail.com>
> wrote:
>
>> Hi John
>>
>> This is because you are using event time (TumblingEventTimeWinodws) but
>> you do not have a event time watermark strategy.
>> It is also why I opened:
>> https://issues.apache.org/jira/browse/FLINK-24623 because I feel like
>> Flink should be throwing an exception in that case
>> on startup.
>>
>> Take a look at the documentation at:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
>> which should have everything.
>>
>> > In order to work with event time, Flink needs to know the events
>> timestamps, meaning each element in the stream needs to have its event
>> timestamp assigned. This is usually done by accessing/extracting the
>> timestamp from > some field in the element by using a TimestampAssigner.
>> > Timestamp assignment goes hand-in-hand with generating watermarks,
>> which tell the system about progress in event time. You can configure this
>> by specifying a WatermarkGenerator.
>>
>> Best regards,
>>
>> Dario
>> On 31.01.22 22:28, John Smith wrote:
>>
>> Hi I have the following job... I'm expecting the System.out
>> .println(key.toString());   to at least print, but nothing prints.
>>
>> - .flatMap: Fires prints my debug message once as expected.
>> - .keyBy: Also fires, but prints my debug message twice.
>> - .apply: Doesn't seem to fire. The debug statement doesn't seem to
>> print. I'm expecting it to print the key from above keyBy.
>>
>> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
>>         .uid(kafkaTopic).name(kafkaTopic)
>>         .setParallelism(1)
>>         .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
>>         .uid("map-json-logs").name("map-json-logs");        slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints twice
>>         .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
>>                 .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, String, String>, TimeWindow>() {
>>             @Override            public void apply(Tuple3<String, String, String> key, TimeWindow window, Iterable<MyEvent> input, Collector<MyEvent> out) throws Exception {
>>                 // This should print.                System.out.println(key.toString());                // Do nothing for now            }
>>         })
>>         .uid("process").name("process")
>>         ;
>>
>>
>>
>>

Re: Tumbling window apply will not "fire"

Posted by Dario Heinisch <da...@gmail.com>.
Then you should be using a process based time window, in your case: 
TumblingProcessingTimeWindows

See 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/ 
for more info

On 31.01.22 23:13, John Smith wrote:
> Hi Dario, I don't care about event time I just want to do 
> tumbling window over the "processing time" I.e: count whatever I have 
> in the last 5 minutes.
>
> On Mon, 31 Jan 2022 at 17:09, Dario Heinisch 
> <da...@gmail.com> wrote:
>
>     Hi John
>
>     This is because you are using event time
>     (TumblingEventTimeWinodws) but you do not have a event time
>     watermark strategy.
>     It is also why I opened:
>     https://issues.apache.org/jira/browse/FLINK-24623 because I feel
>     like Flink should be throwing an exception in that case
>     on startup.
>
>     Take a look at the documentation at:
>     https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
>
>     which should have everything.
>
>     > In order to work with event time, Flink needs to know the events
>     timestamps, meaning each element in the stream needs to have its
>     event timestamp assigned. This is usually done by
>     accessing/extracting the timestamp from > some field in the
>     element by using a TimestampAssigner.
>     > Timestamp assignment goes hand-in-hand with generating
>     watermarks, which tell the system about progress in event time.
>     You can configure this by specifying a WatermarkGenerator.
>
>     Best regards,
>
>     Dario
>
>     On 31.01.22 22:28, John Smith wrote:
>>     Hi I have the following job... I'm expecting the
>>     System.out.println(key.toString());  to at least print, but
>>     nothing prints.
>>
>>     - .flatMap: Fires prints my debug message once as expected.
>>     - .keyBy: Also fires, but prints my debug message twice.
>>     - .apply: Doesn't seem to fire. The debug statement doesn't seem
>>     to print. I'm expecting it to print the key from above keyBy.
>>
>>     DataStream<MyEvent> slStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
>>              .uid(kafkaTopic).name(kafkaTopic)
>>              .setParallelism(1)
>>              .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
>>              .uid("map-json-logs").name("map-json-logs"); slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints twice
>>              .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
>>                      .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, String, String>, TimeWindow>() {
>>                  @Override public void apply(Tuple3<String, String, String> key, TimeWindow window, Iterable<MyEvent> input, Collector<MyEvent> out)throws Exception {
>>                      // This should print. System.out.println(key.toString()); // Do nothing for now }
>>              })
>>              .uid("process").name("process")
>>              ;
>>
>>

Re: Tumbling window apply will not "fire"

Posted by John Smith <ja...@gmail.com>.
Hi Dario, I don't care about event time I just want to do tumbling window
over the "processing time" I.e: count whatever I have in the last 5 minutes.

On Mon, 31 Jan 2022 at 17:09, Dario Heinisch <da...@gmail.com>
wrote:

> Hi John
>
> This is because you are using event time (TumblingEventTimeWinodws) but
> you do not have a event time watermark strategy.
> It is also why I opened: https://issues.apache.org/jira/browse/FLINK-24623
> because I feel like Flink should be throwing an exception in that case
> on startup.
>
> Take a look at the documentation at:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
> which should have everything.
>
> > In order to work with event time, Flink needs to know the events
> timestamps, meaning each element in the stream needs to have its event
> timestamp assigned. This is usually done by accessing/extracting the
> timestamp from > some field in the element by using a TimestampAssigner.
> > Timestamp assignment goes hand-in-hand with generating watermarks, which
> tell the system about progress in event time. You can configure this by
> specifying a WatermarkGenerator.
>
> Best regards,
>
> Dario
> On 31.01.22 22:28, John Smith wrote:
>
> Hi I have the following job... I'm expecting the System.out
> .println(key.toString());   to at least print, but nothing prints.
>
> - .flatMap: Fires prints my debug message once as expected.
> - .keyBy: Also fires, but prints my debug message twice.
> - .apply: Doesn't seem to fire. The debug statement doesn't seem to print.
> I'm expecting it to print the key from above keyBy.
>
> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
>         .uid(kafkaTopic).name(kafkaTopic)
>         .setParallelism(1)
>         .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
>         .uid("map-json-logs").name("map-json-logs");        slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints twice
>         .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
>                 .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, String, String>, TimeWindow>() {
>             @Override            public void apply(Tuple3<String, String, String> key, TimeWindow window, Iterable<MyEvent> input, Collector<MyEvent> out) throws Exception {
>                 // This should print.                System.out.println(key.toString());                // Do nothing for now            }
>         })
>         .uid("process").name("process")
>         ;
>
>
>
>

Re: Tumbling window apply will not "fire"

Posted by Dario Heinisch <da...@gmail.com>.
Hi John

This is because you are using event time (TumblingEventTimeWinodws) but 
you do not have a event time watermark strategy.
It is also why I opened: 
https://issues.apache.org/jira/browse/FLINK-24623 because I feel like 
Flink should be throwing an exception in that case
on startup.

Take a look at the documentation at: 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/ 

which should have everything.

 > In order to work with event time, Flink needs to know the events 
timestamps, meaning each element in the stream needs to have its event 
timestamp assigned. This is usually done by accessing/extracting the 
timestamp from > some field in the element by using a TimestampAssigner.
 > Timestamp assignment goes hand-in-hand with generating watermarks, 
which tell the system about progress in event time. You can configure 
this by specifying a WatermarkGenerator.

Best regards,

Dario

On 31.01.22 22:28, John Smith wrote:
> Hi I have the following job... I'm expecting the 
> System.out.println(key.toString());  to at least print, but nothing 
> prints.
>
> - .flatMap: Fires prints my debug message once as expected.
> - .keyBy: Also fires, but prints my debug message twice.
> - .apply: Doesn't seem to fire. The debug statement doesn't seem to 
> print. I'm expecting it to print the key from above keyBy.
>
> DataStream<MyEvent> slStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source")
>          .uid(kafkaTopic).name(kafkaTopic)
>          .setParallelism(1)
>          .flatMap(new MapToMyEvent("my-event", "message")) // <--- This works
>          .uid("map-json-logs").name("map-json-logs"); slStream.keyBy(new MinutesKeySelector(windowSizeMins)) // <--- This prints twice
>          .window(TumblingEventTimeWindows.of(Time.minutes(windowSizeMins)))
>                  .apply(new WindowFunction<MyEvent, MyEvent, Tuple3<String, String, String>, TimeWindow>() {
>              @Override public void apply(Tuple3<String, String, String> key, TimeWindow window, Iterable<MyEvent> input, Collector<MyEvent> out)throws Exception {
>                  // This should print. System.out.println(key.toString()); // Do nothing for now }
>          })
>          .uid("process").name("process")
>          ;
>
>