You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Shilpa Shankar <ss...@bandwidth.com> on 2022/05/09 12:54:52 UTC

Notify on 0 events in a Tumbling Event Time Window

Hello,
We are building a flink use case where we are consuming from a kafka topic
and performing aggregations and generating alerts based on average, max,
min thresholds. We also need to notify the users when there are 0 events in
a Tumbling Event Time Windows. We are having trouble coming up with a
solution to do the same. The options we considered are below, please let us
know if there are other ideas we haven't looked into.

[1] Querable State : Save the keys in each of the Process Window Functions.
Query the state from an external application and alert when a key is
missing after the 20min time interval has expired. We see Queryable state
feature is being deprecated in the future. We do not want to go down this
path when we already know there is an EOL for it.

[2] Use Processing Time Windows :  Using Processing time instead of Event
time would have been an option if our downstream applications would send
out events in real time. Maintenances of the downstream applications,
delays etc would result in a lot of data loss which is undesirable.

Flink version : 1.14.3

Thanks,
Shilpa

RE: Notify on 0 events in a Tumbling Event Time Window

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Shilpa,

There is no need to have artificial messages in the input kafka topic (and I don’t see where Andrew suggests this 😊 )

However your use case is not 100% clear as to for which keys you want to emit 0-count window results , either:

  *   A) For all keys your job has ever seen (that’s easy), or
  *   B) For all keys you job has seen, but you stop sending 0-count windows after the first one is emitted, and only start with the key when there is a new input event on the key, or
  *   C) For all keys from a pre-selection of keys

A KeyedProcessFunction is the way to go
I’ll sketch a solution for scenario A) the others are similar (scala-ish):

class Manual0Windowing extends KeyedProcessFunction[…] {

def open(…) = {
            //register state primitive for aggregated window state with default 0-window-state
            val state = …
}

def processEvent(event, …) = {
            val windowEnd = getWindowEndTime(event)
            ctx.registerEventTimeTimer(windowEnd)
            var currentState = state.get //or default)
            currentState = aggregate(currentState, event)
            state.put(currentState)
}

def onTimer(timestamp, ctx, out) = {
            val currentState = state.get

            if(is0Window(currentState)) {
                        //for scenario B) drop next line
                        ctx….registerEventTimer(timestamp + tumblingWIndowTime)

            } else {
                        ctx….registerEventTimer(timestamp + tumblingWIndowTime)
}
out.collect(currentState)
state.clear
}

}

… Just to give an idea
… this code does not take care of late events (need too use a MapState instead keyed by windowEndTime)

What do you think…?

Thias



From: Shilpa Shankar <ss...@bandwidth.com>
Sent: Monday, May 9, 2022 4:00 PM
To: Dario Heinisch <da...@gmail.com>; Andrew Otto <ot...@wikimedia.org>
Cc: user@flink.apache.org
Subject: Re: Notify on 0 events in a Tumbling Event Time Window

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Thanks Andrew.
We did consider this solution too. Unfortunately we do not have permissions to generate artificial kafka events in our ecosystem.

Dario,
Thanks for your inputs. We will give your design a try. Due the number of events being processed per window, we are using incremental aggregate function https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#processwindowfunction-with-incremental-aggregation. Do you think we can use KeyedCoProcessFunction in this design?

Thanks,
Shilpa







On Mon, May 9, 2022 at 9:31 AM Dario Heinisch <da...@gmail.com>> wrote:

It depends on the user case,  in Shilpa's use case it is about users so the user ids are probably know beforehand.

https://dpaste.org/cRe3G <= This is an example with out an window but essentially Shilpa you would be reregistering the timers every time they fire.
You would also have to ingest the user ids before hand into your pipeline, so that if a user never has any event he still gets a notification. So probably on startup ingest the user ids with a single source
from the DB.

My example is pretty minimal but the idea in your case stays the same:

- key by user
- have a co-process function to init the state with the user ids
- reregister the timers every time they fire
- use `env.getConfig().setAutoWatermarkInterval(1000)` to move the event time forward even if there is no data coming in (this is what you are probably looking for!!)
- then collect an Optionable/CustomStruct/Null or so depending on if data is present or not
- and then u can check whether the event was triggered because there was data or because there wasn't data

Best regards,

Dario
On 09.05.22 15:19, Andrew Otto wrote:
This sounds similar to a non streaming problem we had at WMF.  We ingest all event data from Kafka into HDFS/Hive and partition the Hive tables in hourly directories.  If there are no events in a Kafka topic for a given hour, we have no way of knowing if the hour has been ingested successfully.  For all we know, the upstream producer pipeline might be broken.

We solved this by emitting artificial 'canary' events into each topic multiple times an hour.  The canary events producer uses the same code pathways and services that (most) of our normal event producers do.  Then, when ingesting into Hive, we filter out the canary events.  The ingestion code has work to do and can mark an hour as complete, but still end up writing no events to it.

Perhaps you could do the same?  Always emit artificial events, and filter them out in your windowing code? The window should still fire since it will always have events, even if you don't use them?




On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar <ss...@bandwidth.com>> wrote:
Hello,
We are building a flink use case where we are consuming from a kafka topic and performing aggregations and generating alerts based on average, max, min thresholds. We also need to notify the users when there are 0 events in a Tumbling Event Time Windows. We are having trouble coming up with a solution to do the same. The options we considered are below, please let us know if there are other ideas we haven't looked into.

[1] Querable State : Save the keys in each of the Process Window Functions. Query the state from an external application and alert when a key is missing after the 20min time interval has expired. We see Queryable state feature is being deprecated in the future. We do not want to go down this path when we already know there is an EOL for it.

[2] Use Processing Time Windows :  Using Processing time instead of Event time would have been an option if our downstream applications would send out events in real time. Maintenances of the downstream applications, delays etc would result in a lot of data loss which is undesirable.

Flink version : 1.14.3

Thanks,
Shilpa
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Notify on 0 events in a Tumbling Event Time Window

Posted by Shilpa Shankar <ss...@bandwidth.com>.
Thanks Andrew.
We did consider this solution too. Unfortunately we do not have permissions
to generate artificial kafka events in our ecosystem.

Dario,
Thanks for your inputs. We will give your design a try. Due the number of
events being processed per window, we are using incremental aggregate
function
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#processwindowfunction-with-incremental-aggregation.
Do you think we can use KeyedCoProcessFunction in this design?

Thanks,
Shilpa







On Mon, May 9, 2022 at 9:31 AM Dario Heinisch <da...@gmail.com>
wrote:

> It depends on the user case,  in Shilpa's use case it is about users so
> the user ids are probably know beforehand.
>
> https://dpaste.org/cRe3G <= This is an example with out an window but
> essentially Shilpa you would be reregistering the timers every time they
> fire.
> You would also have to ingest the user ids before hand into your pipeline,
> so that if a user never has any event he still gets a notification. So
> probably on startup ingest the user ids with a single source
> from the DB.
>
> My example is pretty minimal but the idea in your case stays the same:
>
> - key by user
> - have a co-process function to init the state with the user ids
> - reregister the timers every time they fire
> - use `env.getConfig().setAutoWatermarkInterval(1000)` to move the event
> time forward even if there is no data coming in (this is what you are
> probably looking for!!)
> - then collect an Optionable/CustomStruct/Null or so depending on if data
> is present or not
> - and then u can check whether the event was triggered because there was
> data or because there wasn't data
>
> Best regards,
>
> Dario
> On 09.05.22 15:19, Andrew Otto wrote:
>
> This sounds similar to a non streaming problem we had at WMF.  We ingest
> all event data from Kafka into HDFS/Hive and partition the Hive tables in
> hourly directories.  If there are no events in a Kafka topic for a given
> hour, we have no way of knowing if the hour has been ingested
> successfully.  For all we know, the upstream producer pipeline might be
> broken.
>
> We solved this by emitting artificial 'canary' events into each topic
> multiple times an hour.  The canary events producer uses the same code
> pathways and services that (most) of our normal event producers do.  Then,
> when ingesting into Hive, we filter out the canary events.  The ingestion
> code has work to do and can mark an hour as complete, but still end up
> writing no events to it.
>
> Perhaps you could do the same?  Always emit artificial events, and filter
> them out in your windowing code? The window should still fire since it will
> always have events, even if you don't use them?
>
>
>
>
> On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar <ss...@bandwidth.com>
> wrote:
>
>> Hello,
>> We are building a flink use case where we are consuming from a kafka
>> topic and performing aggregations and generating alerts based on average,
>> max, min thresholds. We also need to notify the users when there are 0
>> events in a Tumbling Event Time Windows. We are having trouble coming up
>> with a solution to do the same. The options we considered are below, please
>> let us know if there are other ideas we haven't looked into.
>>
>> [1] Querable State : Save the keys in each of the Process Window
>> Functions. Query the state from an external application and alert when a
>> key is missing after the 20min time interval has expired. We see Queryable
>> state feature is being deprecated in the future. We do not want to go down
>> this path when we already know there is an EOL for it.
>>
>> [2] Use Processing Time Windows :  Using Processing time instead of Event
>> time would have been an option if our downstream applications would send
>> out events in real time. Maintenances of the downstream applications,
>> delays etc would result in a lot of data loss which is undesirable.
>>
>> Flink version : 1.14.3
>>
>> Thanks,
>> Shilpa
>>
>

Re: Notify on 0 events in a Tumbling Event Time Window

Posted by Dario Heinisch <da...@gmail.com>.
It depends on the user case,  in Shilpa's use case it is about users so 
the user ids are probably know beforehand.

https://dpaste.org/cRe3G <= This is an example with out an window but 
essentially Shilpa you would be reregistering the timers every time they 
fire.
You would also have to ingest the user ids before hand into your 
pipeline, so that if a user never has any event he still gets a 
notification. So probably on startup ingest the user ids with a single 
source
from the DB.

My example is pretty minimal but the idea in your case stays the same:

- key by user
- have a co-process function to init the state with the user ids
- reregister the timers every time they fire
- use `env.getConfig().setAutoWatermarkInterval(1000)` to move the event 
time forward even if there is no data coming in (this is what you are 
probably looking for!!)
- then collect an Optionable/CustomStruct/Null or so depending on if 
data is present or not
- and then u can check whether the event was triggered because there was 
data or because there wasn't data

Best regards,

Dario

On 09.05.22 15:19, Andrew Otto wrote:
> This sounds similar to a non streaming problem we had at WMF.  We 
> ingest all event data from Kafka into HDFS/Hive and partition the Hive 
> tables in hourly directories.  If there are no events in a Kafka topic 
> for a given hour, we have no way of knowing if the hour has been 
> ingested successfully.  For all we know, the upstream producer 
> pipeline might be broken.
>
> We solved this by emitting artificial 'canary' events into each topic 
> multiple times an hour.  The canary events producer uses the same code 
> pathways and services that (most) of our normal event producers do.  
> Then, when ingesting into Hive, we filter out the canary events.  The 
> ingestion code has work to do and can mark an hour as complete, but 
> still end up writing no events to it.
>
> Perhaps you could do the same?  Always emit artificial events, and 
> filter them out in your windowing code? The window should still fire 
> since it will always have events, even if you don't use them?
>
>
>
>
> On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar <ss...@bandwidth.com> 
> wrote:
>
>     Hello,
>     We are building a flink use case where we are consuming from a
>     kafka topic and performing aggregations and generating alerts
>     based on average, max, min thresholds. We also need to notify the
>     users when there are 0 events in a Tumbling Event Time Windows. We
>     are having trouble coming up with a solution to do the same. The
>     options we considered are below, please let us know if there are
>     other ideas we haven't looked into.
>
>     [1] Querable State : Save the keys in each of the Process Window
>     Functions. Query the state from an external application and alert
>     when a key is missing after the 20min time interval has expired.
>     We see Queryable state feature is being deprecated in the future.
>     We do not want to go down this path when we already know there is
>     an EOL for it.
>
>     [2] Use Processing Time Windows :  Using Processing time instead
>     of Event time would have been an option if our downstream
>     applications would send out events in real time. Maintenances of
>     the downstream applications, delays etc would result in a lot of
>     data loss which is undesirable.
>
>     Flink version : 1.14.3
>
>     Thanks,
>     Shilpa
>

Re: Notify on 0 events in a Tumbling Event Time Window

Posted by Andrew Otto <ot...@wikimedia.org>.
This sounds similar to a non streaming problem we had at WMF.  We ingest
all event data from Kafka into HDFS/Hive and partition the Hive tables in
hourly directories.  If there are no events in a Kafka topic for a given
hour, we have no way of knowing if the hour has been ingested
successfully.  For all we know, the upstream producer pipeline might be
broken.

We solved this by emitting artificial 'canary' events into each topic
multiple times an hour.  The canary events producer uses the same code
pathways and services that (most) of our normal event producers do.  Then,
when ingesting into Hive, we filter out the canary events.  The ingestion
code has work to do and can mark an hour as complete, but still end up
writing no events to it.

Perhaps you could do the same?  Always emit artificial events, and filter
them out in your windowing code? The window should still fire since it will
always have events, even if you don't use them?




On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar <ss...@bandwidth.com>
wrote:

> Hello,
> We are building a flink use case where we are consuming from a kafka topic
> and performing aggregations and generating alerts based on average, max,
> min thresholds. We also need to notify the users when there are 0 events in
> a Tumbling Event Time Windows. We are having trouble coming up with a
> solution to do the same. The options we considered are below, please let us
> know if there are other ideas we haven't looked into.
>
> [1] Querable State : Save the keys in each of the Process Window
> Functions. Query the state from an external application and alert when a
> key is missing after the 20min time interval has expired. We see Queryable
> state feature is being deprecated in the future. We do not want to go down
> this path when we already know there is an EOL for it.
>
> [2] Use Processing Time Windows :  Using Processing time instead of Event
> time would have been an option if our downstream applications would send
> out events in real time. Maintenances of the downstream applications,
> delays etc would result in a lot of data loss which is undesirable.
>
> Flink version : 1.14.3
>
> Thanks,
> Shilpa
>