You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2020/12/02 15:46:51 UTC

Re: Process windows not firing - > Can it be a Watermak issue?

Hi Simone,

Which version of Flink are you using? Have you enabled event time support
via
StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?
Are you sure that the topic you are consuming contains data? Maybe you can
share the whole job with example data with us so that we can take a look at
it as a whole.

The fact that assigner has no field to display simply means that it
contains no fields which is normal for lambda functions.

Cheers,
Till

On Wed, Dec 2, 2020 at 4:21 PM Simone Cavallarin <ca...@hotmail.com>
wrote:

> Hi All,
>
> My code is not firing the process windows. I'm giving a static gap of
> '2000' to test (*DynamicSessionWindows()* - > is returning a fix '2000'
> that If I'm not wrong should be 2 seconds.
>
> * FlinkKafkaConsumer<Event> kafkaData =*
> *                new FlinkKafkaConsumer("CorID_0", new
> EventDeserializationSchema(), p);*
> *        WatermarkStrategy<Event> wmStrategy =*
> *                WatermarkStrategy*
> *                        .<Event>forMonotonousTimestamps()*
> *                        .withIdleness(Duration.ofMinutes(1))*
> *                        .withTimestampAssigner((Event, timestamp) -> {*
> *                            return Event.get_Time();*
> *                        });*
> *        DataStream<Event> stream = env.addSource(*
> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>
>
> *        DataStream<Tuple2<Event, Long>>** enriched** = stream*
> *                .keyBy((Event CorrID) -> CorrID.get_CorrID())*
> *                .map(new StatefulSessionCalculator());      *
>
> *      DataStream<String> WinStream = enriched*
> *                .keyBy(new MyKeySelector())*
> *                .window(EventTimeSessionWindows.withDynamicGap(new *
> *DynamicSessionWindows()**))*
> *                .process(new MyProcessWindowFunction());*
>
> The "enriched" is where I'm enriching the message with the millis to use
> this number for a gap based on a function that I have implemented which is
> providing a calculated gap every event. For test purposed I have inserted
> "2000" manually, to see if it was firing every 2 seconds, but it is never
> firing.
>
> I was then wondering how can i run some checks under the hood? To
> understand if my
> *.withTimestampAssigner((Event, timestamp) -> {*
> *                            return Event.get_Time();*
> *                        });*
> have assigned correctly the watermark, I'm particularly concerned about
> this because if I debug the application i can't see any watermak assigned,
> but maybe  I'm seraching on the wrong place and this is not the reason of
> my issue?
>
>
>
>
> Thanks!
>
> Simone
>