You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Maciek Próchniak <mp...@touk.pl> on 2016/09/23 08:36:17 UTC

window-like use case

Hi,

in our project we're dealing with a stream of billing events. Each has 
customerId and charge amount
We want to have a process that will trigger event (alarm) when sum of 
charges for customer during last 4 hours exceeds certain threshold, say 
- 10.
The triggered event should contain data from last billing event (the one 
that triggered alarm)

One one hand we can implement it as custom state - we'd save charges (or 
some precomputed aggregates) from last 4 hours and trigger event when 
new one arrives.
OTOH we've been thinking if we can make it easier by using flink windows.
We tried to model our situation as sliding events (of length 4hours, 
slide is 1h), have some precomputed aggregate and have a custom trigger 
that fires on element when threshold is exceeded.
It kinda works, except for the fact that state is unnecesarily large, 
custom trigger is a bit hacky and (worst of all) when event with charge 
amount e.g. 20 arrives trigger fires in all slides and we have 
duplicated events.
That's why we currently think about implementing it by custom state...

Do you have any other ideas/recommendations how can we handle such 
requirement?

thanks,
maciek

Re: window-like use case

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks for the update! Let us know if you need anything.

On Fri, 4 Nov 2016 at 20:43 Maciek Próchniak <mp...@touk.pl> wrote:

> Hi Aljoscha,
>
> I know it's tricky...
>
> Few weeks ago we decided to implement it without windows, using just
> stateful operator and some queues/map per key as state - so yeah, we tried
> to imagine how to do this in plain java and one stream ;)
>
> We also process watermarks to evict old events. Fortunately, our streams
> are not that big and we can keep all "recent" events in state - without
> preaggregation.
>
> Currently we're waiting for some feedback from our client on results - if
> it's ok, we'll stick with that, otherwise we'll have to look into it
> deeper...
>
> thanks,
>
> maciek
>
>
> On 25/10/2016 16:41, Aljoscha Krettek wrote:
>
> Hi Maciek,
> cases like this, where you essentially want to evict elements that are
> older than a certain threshold while keeping a count of those elements that
> are not older than that threshold tend to be quite tricky.
>
> In order to start thinking about this, how would you implement this case
> in a non-parallel way, in plain Java. You have the stream of incoming
> events, they possibly have timestamps, they are possibly not ordered by
> that timestamp (this depends on your use case). Now, what are the
> algorithms/data structures that could be used for computing the result that
> you require?
>
> Cheers,
> Aljoscha
>
> On Fri, 23 Sep 2016 at 10:50 Claudia Wegmann <c....@kasasi.de> wrote:
>
> Hey,
>
> I'm no expert at all, but for me this sounds like a use case for Complex
> Event Processing (CEP). I don't know if you're aware of Flinks CEP Library
> [1, 2]? Maybe that solves your problem of multiple firings. But best to
> wait for the experts to answer your questions on handling state and firing
> windows :)
>
> Best,
> Claudia
>
> [1]: https://flink.apache.org/news/2016/04/06/cep-monitoring.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html
>
>
> -----Ursprüngliche Nachricht-----
> Von: Maciek Próchniak [mailto:mpr@touk.pl]
> Gesendet: Freitag, 23. September 2016 10:36
> An: user@flink.apache.org
> Betreff: window-like use case
>
> Hi,
>
> in our project we're dealing with a stream of billing events. Each has
> customerId and charge amount We want to have a process that will trigger
> event (alarm) when sum of charges for customer during last 4 hours exceeds
> certain threshold, say
> - 10.
> The triggered event should contain data from last billing event (the one
> that triggered alarm)
>
> One one hand we can implement it as custom state - we'd save charges (or
> some precomputed aggregates) from last 4 hours and trigger event when new
> one arrives.
> OTOH we've been thinking if we can make it easier by using flink windows.
> We tried to model our situation as sliding events (of length 4hours, slide
> is 1h), have some precomputed aggregate and have a custom trigger that
> fires on element when threshold is exceeded.
> It kinda works, except for the fact that state is unnecesarily large,
> custom trigger is a bit hacky and (worst of all) when event with charge
> amount e.g. 20 arrives trigger fires in all slides and we have duplicated
> events.
> That's why we currently think about implementing it by custom state...
>
> Do you have any other ideas/recommendations how can we handle such
> requirement?
>
> thanks,
> maciek
>
>
>

Re: window-like use case

Posted by Maciek Próchniak <mp...@touk.pl>.
Hi Aljoscha,

I know it's tricky...

Few weeks ago we decided to implement it without windows, using just 
stateful operator and some queues/map per key as state - so yeah, we 
tried to imagine how to do this in plain java and one stream ;)

We also process watermarks to evict old events. Fortunately, our streams 
are not that big and we can keep all "recent" events in state - without 
preaggregation.

Currently we're waiting for some feedback from our client on results - 
if it's ok, we'll stick with that, otherwise we'll have to look into it 
deeper...

thanks,

maciek


On 25/10/2016 16:41, Aljoscha Krettek wrote:
> Hi Maciek,
> cases like this, where you essentially want to evict elements that are 
> older than a certain threshold while keeping a count of those elements 
> that are not older than that threshold tend to be quite tricky.
>
> In order to start thinking about this, how would you implement this 
> case in a non-parallel way, in plain Java. You have the stream of 
> incoming events, they possibly have timestamps, they are possibly not 
> ordered by that timestamp (this depends on your use case). Now, what 
> are the algorithms/data structures that could be used for computing 
> the result that you require?
>
> Cheers,
> Aljoscha
>
> On Fri, 23 Sep 2016 at 10:50 Claudia Wegmann <c.wegmann@kasasi.de 
> <ma...@kasasi.de>> wrote:
>
>     Hey,
>
>     I'm no expert at all, but for me this sounds like a use case for
>     Complex Event Processing (CEP). I don't know if you're aware of
>     Flinks CEP Library [1, 2]? Maybe that solves your problem of
>     multiple firings. But best to wait for the experts to answer your
>     questions on handling state and firing windows :)
>
>     Best,
>     Claudia
>
>     [1]: https://flink.apache.org/news/2016/04/06/cep-monitoring.html
>     [2]:
>     https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html
>
>
>     -----Urspr�ngliche Nachricht-----
>     Von: Maciek Pr�chniak [mailto:mpr@touk.pl <ma...@touk.pl>]
>     Gesendet: Freitag, 23. September 2016 10:36
>     An: user@flink.apache.org <ma...@flink.apache.org>
>     Betreff: window-like use case
>
>     Hi,
>
>     in our project we're dealing with a stream of billing events. Each
>     has customerId and charge amount We want to have a process that
>     will trigger event (alarm) when sum of charges for customer during
>     last 4 hours exceeds certain threshold, say
>     - 10.
>     The triggered event should contain data from last billing event
>     (the one that triggered alarm)
>
>     One one hand we can implement it as custom state - we'd save
>     charges (or some precomputed aggregates) from last 4 hours and
>     trigger event when new one arrives.
>     OTOH we've been thinking if we can make it easier by using flink
>     windows.
>     We tried to model our situation as sliding events (of length
>     4hours, slide is 1h), have some precomputed aggregate and have a
>     custom trigger that fires on element when threshold is exceeded.
>     It kinda works, except for the fact that state is unnecesarily
>     large, custom trigger is a bit hacky and (worst of all) when event
>     with charge amount e.g. 20 arrives trigger fires in all slides and
>     we have duplicated events.
>     That's why we currently think about implementing it by custom state...
>
>     Do you have any other ideas/recommendations how can we handle such
>     requirement?
>
>     thanks,
>     maciek
>


Re: window-like use case

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Maciek,
cases like this, where you essentially want to evict elements that are
older than a certain threshold while keeping a count of those elements that
are not older than that threshold tend to be quite tricky.

In order to start thinking about this, how would you implement this case in
a non-parallel way, in plain Java. You have the stream of incoming events,
they possibly have timestamps, they are possibly not ordered by that
timestamp (this depends on your use case). Now, what are the
algorithms/data structures that could be used for computing the result that
you require?

Cheers,
Aljoscha

On Fri, 23 Sep 2016 at 10:50 Claudia Wegmann <c....@kasasi.de> wrote:

> Hey,
>
> I'm no expert at all, but for me this sounds like a use case for Complex
> Event Processing (CEP). I don't know if you're aware of Flinks CEP Library
> [1, 2]? Maybe that solves your problem of multiple firings. But best to
> wait for the experts to answer your questions on handling state and firing
> windows :)
>
> Best,
> Claudia
>
> [1]: https://flink.apache.org/news/2016/04/06/cep-monitoring.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html
>
>
> -----Ursprüngliche Nachricht-----
> Von: Maciek Próchniak [mailto:mpr@touk.pl]
> Gesendet: Freitag, 23. September 2016 10:36
> An: user@flink.apache.org
> Betreff: window-like use case
>
> Hi,
>
> in our project we're dealing with a stream of billing events. Each has
> customerId and charge amount We want to have a process that will trigger
> event (alarm) when sum of charges for customer during last 4 hours exceeds
> certain threshold, say
> - 10.
> The triggered event should contain data from last billing event (the one
> that triggered alarm)
>
> One one hand we can implement it as custom state - we'd save charges (or
> some precomputed aggregates) from last 4 hours and trigger event when new
> one arrives.
> OTOH we've been thinking if we can make it easier by using flink windows.
> We tried to model our situation as sliding events (of length 4hours, slide
> is 1h), have some precomputed aggregate and have a custom trigger that
> fires on element when threshold is exceeded.
> It kinda works, except for the fact that state is unnecesarily large,
> custom trigger is a bit hacky and (worst of all) when event with charge
> amount e.g. 20 arrives trigger fires in all slides and we have duplicated
> events.
> That's why we currently think about implementing it by custom state...
>
> Do you have any other ideas/recommendations how can we handle such
> requirement?
>
> thanks,
> maciek
>

AW: window-like use case

Posted by Claudia Wegmann <c....@kasasi.de>.
Hey,

I'm no expert at all, but for me this sounds like a use case for Complex Event Processing (CEP). I don't know if you're aware of Flinks CEP Library [1, 2]? Maybe that solves your problem of multiple firings. But best to wait for the experts to answer your questions on handling state and firing windows :)

Best,
Claudia

[1]: https://flink.apache.org/news/2016/04/06/cep-monitoring.html
[2]: https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html


-----Ursprüngliche Nachricht-----
Von: Maciek Próchniak [mailto:mpr@touk.pl] 
Gesendet: Freitag, 23. September 2016 10:36
An: user@flink.apache.org
Betreff: window-like use case

Hi,

in our project we're dealing with a stream of billing events. Each has customerId and charge amount We want to have a process that will trigger event (alarm) when sum of charges for customer during last 4 hours exceeds certain threshold, say
- 10.
The triggered event should contain data from last billing event (the one that triggered alarm)

One one hand we can implement it as custom state - we'd save charges (or some precomputed aggregates) from last 4 hours and trigger event when new one arrives.
OTOH we've been thinking if we can make it easier by using flink windows.
We tried to model our situation as sliding events (of length 4hours, slide is 1h), have some precomputed aggregate and have a custom trigger that fires on element when threshold is exceeded.
It kinda works, except for the fact that state is unnecesarily large, custom trigger is a bit hacky and (worst of all) when event with charge amount e.g. 20 arrives trigger fires in all slides and we have duplicated events.
That's why we currently think about implementing it by custom state...

Do you have any other ideas/recommendations how can we handle such requirement?

thanks,
maciek