You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by oleg <ol...@gmail.com> on 2020/02/11 20:15:44 UTC

Aggregation for last n seconds for each event

Hi Community,

I do streaming in event time and I want to preserve ordering and late 
events. I have a use case where I need to fire an aggregation function 
for events of last n seconds(time units in general) for every incoming 
event.

It seems to me that windowing is not suitable since it may be expressed 
either in time or in events count, not "last n seconds for each single 
event".

Is there an idiomatic way to do this? Any examples or help are 
appreciated. Thanks in advance.


Best regards,

Oleg Bonar


Re: Aggregation for last n seconds for each event

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Oleg,

With the approach with the MapState you can always fire on every
incoming element :)
You just iterate in the map state and find all the elements that have
timestamp (key) between the timestamp of the current element (NOW) and
and NOW-N.

Anyway, if Fanbin's solution works, then you can always use that!

Cheers,
Kostas

On Wed, Feb 12, 2020 at 7:18 PM Олег Бонарь <ol...@gmail.com> wrote:
>
> Hi Kostas,
>
> Thanks for your reply!
> Yes, you understand me correctly. However, I also want the stream to be keyed to process it in parallel. I'm afraid the approach with MapState you suggested doesn't really suite my use case because I need to fire on every incoming event.
> Logically, Fanbin's "RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW" looks 100% like what I need, but I haven't tried it yet.
> Also wondering if it might be expressed in DataStream API.
>
> ср, 12 февр. 2020 г. в 13:06, Kostas Kloudas <kk...@gmail.com>:
>>
>> Hi Oleg,
>>
>> Could you be more specific on what do you mean by
>> "for events of last n seconds(time units in general) for every incoming event."?
>>
>> Do you mean that you have a stream of parallelism 1 and you want for
>> each incoming element to have your function fire with input the event
>> itself and all the events that arrived within the last N time units?
>> If this is the case, you can use a dummy key to key your stream to
>> have access to keyed state, then use Map State with key being the
>> timestamp and value being a list of the already seen elements with
>> that timestamp and whenever an element arrives, you can register a
>> timer to fire N time units in the future. Then, when the timer fires,
>> you can iterate over the map, fetch the elements you are interested
>> in, and clean-up whatever you will not need anymore.
>>
>> For an example you could look at [1].
>>
>> I hope this helps,
>> Kostas
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu <fa...@coinbase.com> wrote:
>> >
>> > can u do
>> > RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?
>> >
>> > On Tue, Feb 11, 2020 at 12:15 PM oleg <ol...@gmail.com> wrote:
>> >>
>> >> Hi Community,
>> >>
>> >> I do streaming in event time and I want to preserve ordering and late
>> >> events. I have a use case where I need to fire an aggregation function
>> >> for events of last n seconds(time units in general) for every incoming
>> >> event.
>> >>
>> >> It seems to me that windowing is not suitable since it may be expressed
>> >> either in time or in events count, not "last n seconds for each single
>> >> event".
>> >>
>> >> Is there an idiomatic way to do this? Any examples or help are
>> >> appreciated. Thanks in advance.
>> >>
>> >>
>> >> Best regards,
>> >>
>> >> Oleg Bonar
>> >>

Re: Aggregation for last n seconds for each event

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Oleg,

Could you be more specific on what do you mean by
"for events of last n seconds(time units in general) for every incoming event."?

Do you mean that you have a stream of parallelism 1 and you want for
each incoming element to have your function fire with input the event
itself and all the events that arrived within the last N time units?
If this is the case, you can use a dummy key to key your stream to
have access to keyed state, then use Map State with key being the
timestamp and value being a list of the already seen elements with
that timestamp and whenever an element arrives, you can register a
timer to fire N time units in the future. Then, when the timer fires,
you can iterate over the map, fetch the elements you are interested
in, and clean-up whatever you will not need anymore.

For an example you could look at [1].

I hope this helps,
Kostas

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

On Tue, Feb 11, 2020 at 11:18 PM Fanbin Bu <fa...@coinbase.com> wrote:
>
> can u do
> RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?
>
> On Tue, Feb 11, 2020 at 12:15 PM oleg <ol...@gmail.com> wrote:
>>
>> Hi Community,
>>
>> I do streaming in event time and I want to preserve ordering and late
>> events. I have a use case where I need to fire an aggregation function
>> for events of last n seconds(time units in general) for every incoming
>> event.
>>
>> It seems to me that windowing is not suitable since it may be expressed
>> either in time or in events count, not "last n seconds for each single
>> event".
>>
>> Is there an idiomatic way to do this? Any examples or help are
>> appreciated. Thanks in advance.
>>
>>
>> Best regards,
>>
>> Oleg Bonar
>>

Re: Aggregation for last n seconds for each event

Posted by Fanbin Bu <fa...@coinbase.com>.
can u do
RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW?

On Tue, Feb 11, 2020 at 12:15 PM oleg <ol...@gmail.com> wrote:

> Hi Community,
>
> I do streaming in event time and I want to preserve ordering and late
> events. I have a use case where I need to fire an aggregation function
> for events of last n seconds(time units in general) for every incoming
> event.
>
> It seems to me that windowing is not suitable since it may be expressed
> either in time or in events count, not "last n seconds for each single
> event".
>
> Is there an idiomatic way to do this? Any examples or help are
> appreciated. Thanks in advance.
>
>
> Best regards,
>
> Oleg Bonar
>
>