You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yassine MARZOUGUI <y....@mindlytix.com> on 2017/03/07 18:23:33 UTC

Appropriate State to use to buffer events in ProcessFunction

Hi all,

I want to label events in a stream based on a condition on some future
events.
For example my stream contains events of type A and B and and I would like
to assign a label 1 to an event E of type A if an event of type B happens
within a duration x of E. I am using event time and my events can be out of
order.
For this I'm using ProcessFunction which looks suitable for my use case. In
order to handle out of order events, I'm keeping events of type A in a
state and once an event of type B is received, I fire an event time timer
in which I loop through events of type A in the state having a timestamps <
timer.timestamp, label them and remove them from the state.
Currently the state is simply a value state containing a TreeMap<Timestamp,
EventA>. I'm keeping events sorted in order to effectively get events older
than the timer timestamp.
I wonder If that's the appropriate data structure to use in the value state
to buffer events and be able to handle out of orderness, or if there is a
more effective implementation, especially that the state may grow to reach
~100 GB sometimes?

Any insight is appreciated.

Thanks,
Yassine

Re: Appropriate State to use to buffer events in ProcessFunction

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
Hi Xiaogang,

Indeed, the MapState is what I was looking for in order to have efficient
sorted state, as it would faciliate many use cases like this one, or
joining streams, etc. I searched a bit and found your contribution
<https://github.com/apache/flink/pull/3336> of MapState for the next 1.3
release, I'll see how it works for me.
Thank you for pointing this out, very helpful!

Best,
Yassine

2017-03-16 18:50 GMT+01:00 SHI Xiaogang <sh...@gmail.com>:

> Hi Yassine,
>
> If I understand correctly, you are needing sorted states which
> unfortunately are not supported in Flink now.
> We have some ideas to provide such sorted states to facilitate the
> development of user applications. But it is still under discussion due to
> the concerns on back compatibility.
>
> Currently, I think we can work around the problem with MapStates in
> RocksDB statebackends.
> In RocksDB statebackends, each entry in MapState corresponds to an entry
> in RocksDB. The key of a RocksDB entry is formatted as "
> keyGroup#key#keyLen#namespace#namespaceLen#mapKey"
>
> The entries in RocksDB are sorted in the lexicographical order. In the
> cases where the map keys are typed Timestamp/Long, the entries in the
> MapState will be iterated as the same order in a sorted map. Thus, you can
> find all the events whose timestamps are smaller than the given one.
>
> The solution is quite tricky because it does not work when Heap
> statebackends are used. But given that the state may grow up to ~100GB,
> RocksDB statebackends are strongly recommended.
>
> May the information helps you.
>
> Regards,
> Xiaogang
>
> 2017-03-09 23:19 GMT+08:00 Yassine MARZOUGUI <y....@mindlytix.com>:
>
>> Hi Timo,
>>
>> I thought about the ListState but quickly discarded It as it keeps the
>> insersion order and not events order. After a second thought I think I will
>> reconsider it since my events are occaionally out-of-order. Didn't know
>> that Flink CEP operators 'next' and 'within', can handle event time, so I
>> think I will give it a try! Thank you!
>>
>> Best,
>> Yassine
>>
>> 2017-03-08 9:55 GMT+01:00 Timo Walther <tw...@apache.org>:
>>
>>> Hi Yassine,
>>>
>>> have you thought about using a ListState? As far as I know, it keeps at
>>> least the insertion order. You could sort it once your trigger event has
>>> arrived.
>>> If you use a RocksDB as state backend, 100+ GB of state should not be a
>>> problem. Have you thought about using Flink's CEP library? It might fit to
>>> your needs without implementing a custom process function.
>>>
>>> I hope that helps.
>>>
>>> Timo
>>>
>>>
>>> Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
>>>
>>> Hi all,
>>>>
>>>> I want to label events in a stream based on a condition on some future
>>>> events.
>>>> For example my stream contains events of type A and B and and I would
>>>> like to assign a label 1 to an event E of type A if an event of type B
>>>> happens within a duration x of E. I am using event time and my events can
>>>> be out of order.
>>>> For this I'm using ProcessFunction which looks suitable for my use
>>>> case. In order to handle out of order events, I'm keeping events of type A
>>>> in a state and once an event of type B is received, I fire an event time
>>>> timer in which I loop through events of type A in the state having a
>>>> timestamps < timer.timestamp, label them and remove them from the state.
>>>> Currently the state is simply a value state containing a
>>>> TreeMap<Timestamp, EventA>. I'm keeping events sorted in order to
>>>> effectively get events older than the timer timestamp.
>>>> I wonder If that's the appropriate data structure to use in the value
>>>> state to buffer events and be able to handle out of orderness, or if there
>>>> is a more effective implementation, especially that the state may grow to
>>>> reach ~100 GB sometimes?
>>>>
>>>> Any insight is appreciated.
>>>>
>>>> Thanks,
>>>> Yassine
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Re: Appropriate State to use to buffer events in ProcessFunction

Posted by SHI Xiaogang <sh...@gmail.com>.
Hi Yassine,

If I understand correctly, you are needing sorted states which
unfortunately are not supported in Flink now.
We have some ideas to provide such sorted states to facilitate the
development of user applications. But it is still under discussion due to
the concerns on back compatibility.

Currently, I think we can work around the problem with MapStates in RocksDB
statebackends.
In RocksDB statebackends, each entry in MapState corresponds to an entry in
RocksDB. The key of a RocksDB entry is formatted as "
keyGroup#key#keyLen#namespace#namespaceLen#mapKey"

The entries in RocksDB are sorted in the lexicographical order. In the
cases where the map keys are typed Timestamp/Long, the entries in the
MapState will be iterated as the same order in a sorted map. Thus, you can
find all the events whose timestamps are smaller than the given one.

The solution is quite tricky because it does not work when Heap
statebackends are used. But given that the state may grow up to ~100GB,
RocksDB statebackends are strongly recommended.

May the information helps you.

Regards,
Xiaogang

2017-03-09 23:19 GMT+08:00 Yassine MARZOUGUI <y....@mindlytix.com>:

> Hi Timo,
>
> I thought about the ListState but quickly discarded It as it keeps the
> insersion order and not events order. After a second thought I think I will
> reconsider it since my events are occaionally out-of-order. Didn't know
> that Flink CEP operators 'next' and 'within', can handle event time, so I
> think I will give it a try! Thank you!
>
> Best,
> Yassine
>
> 2017-03-08 9:55 GMT+01:00 Timo Walther <tw...@apache.org>:
>
>> Hi Yassine,
>>
>> have you thought about using a ListState? As far as I know, it keeps at
>> least the insertion order. You could sort it once your trigger event has
>> arrived.
>> If you use a RocksDB as state backend, 100+ GB of state should not be a
>> problem. Have you thought about using Flink's CEP library? It might fit to
>> your needs without implementing a custom process function.
>>
>> I hope that helps.
>>
>> Timo
>>
>>
>> Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
>>
>> Hi all,
>>>
>>> I want to label events in a stream based on a condition on some future
>>> events.
>>> For example my stream contains events of type A and B and and I would
>>> like to assign a label 1 to an event E of type A if an event of type B
>>> happens within a duration x of E. I am using event time and my events can
>>> be out of order.
>>> For this I'm using ProcessFunction which looks suitable for my use case.
>>> In order to handle out of order events, I'm keeping events of type A in a
>>> state and once an event of type B is received, I fire an event time timer
>>> in which I loop through events of type A in the state having a timestamps <
>>> timer.timestamp, label them and remove them from the state.
>>> Currently the state is simply a value state containing a
>>> TreeMap<Timestamp, EventA>. I'm keeping events sorted in order to
>>> effectively get events older than the timer timestamp.
>>> I wonder If that's the appropriate data structure to use in the value
>>> state to buffer events and be able to handle out of orderness, or if there
>>> is a more effective implementation, especially that the state may grow to
>>> reach ~100 GB sometimes?
>>>
>>> Any insight is appreciated.
>>>
>>> Thanks,
>>> Yassine
>>>
>>>
>>>
>>>
>>
>

Re: Appropriate State to use to buffer events in ProcessFunction

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
Hi Timo,

I thought about the ListState but quickly discarded It as it keeps the
insersion order and not events order. After a second thought I think I will
reconsider it since my events are occaionally out-of-order. Didn't know
that Flink CEP operators 'next' and 'within', can handle event time, so I
think I will give it a try! Thank you!

Best,
Yassine

2017-03-08 9:55 GMT+01:00 Timo Walther <tw...@apache.org>:

> Hi Yassine,
>
> have you thought about using a ListState? As far as I know, it keeps at
> least the insertion order. You could sort it once your trigger event has
> arrived.
> If you use a RocksDB as state backend, 100+ GB of state should not be a
> problem. Have you thought about using Flink's CEP library? It might fit to
> your needs without implementing a custom process function.
>
> I hope that helps.
>
> Timo
>
>
> Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
>
> Hi all,
>>
>> I want to label events in a stream based on a condition on some future
>> events.
>> For example my stream contains events of type A and B and and I would
>> like to assign a label 1 to an event E of type A if an event of type B
>> happens within a duration x of E. I am using event time and my events can
>> be out of order.
>> For this I'm using ProcessFunction which looks suitable for my use case.
>> In order to handle out of order events, I'm keeping events of type A in a
>> state and once an event of type B is received, I fire an event time timer
>> in which I loop through events of type A in the state having a timestamps <
>> timer.timestamp, label them and remove them from the state.
>> Currently the state is simply a value state containing a
>> TreeMap<Timestamp, EventA>. I'm keeping events sorted in order to
>> effectively get events older than the timer timestamp.
>> I wonder If that's the appropriate data structure to use in the value
>> state to buffer events and be able to handle out of orderness, or if there
>> is a more effective implementation, especially that the state may grow to
>> reach ~100 GB sometimes?
>>
>> Any insight is appreciated.
>>
>> Thanks,
>> Yassine
>>
>>
>>
>>
>

Re: Appropriate State to use to buffer events in ProcessFunction

Posted by Timo Walther <tw...@apache.org>.
Hi Yassine,

have you thought about using a ListState? As far as I know, it keeps at 
least the insertion order. You could sort it once your trigger event has 
arrived.
If you use a RocksDB as state backend, 100+ GB of state should not be a 
problem. Have you thought about using Flink's CEP library? It might fit 
to your needs without implementing a custom process function.

I hope that helps.

Timo


Am 07/03/17 um 19:23 schrieb Yassine MARZOUGUI:
> Hi all,
>
> I want to label events in a stream based on a condition on some future 
> events.
> For example my stream contains events of type A and B and and I would 
> like to assign a label 1 to an event E of type A if an event of type B 
> happens within a duration x of E. I am using event time and my events 
> can be out of order.
> For this I'm using ProcessFunction which looks suitable for my use 
> case. In order to handle out of order events, I'm keeping events of 
> type A in a state and once an event of type B is received, I fire an 
> event time timer in which I loop through events of type A in the state 
> having a timestamps < timer.timestamp, label them and remove them from 
> the state.
> Currently the state is simply a value state containing a 
> TreeMap<Timestamp, EventA>. I'm keeping events sorted in order to 
> effectively get events older than the timer timestamp.
> I wonder If that's the appropriate data structure to use in the value 
> state to buffer events and be able to handle out of orderness, or if 
> there is a more effective implementation, especially that the state 
> may grow to reach ~100 GB sometimes?
>
> Any insight is appreciated.
>
> Thanks,
> Yassine
>
>
>