You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Simone Robutti <si...@radicalbit.io> on 2016/09/29 22:32:19 UTC

Counting latest state of stateful entities in streaming

Hello,

in the last few days I tried to create my first real-time analytics job in
Flink. The approach is kappa-architecture-like, so I have my raw data on
Kafka where we receive a message for every change of state of any entity.

So the messages are of the form

(id,newStatus, timestamp)

We want to compute, for every time window, the count of items in a given
status. So the output should be of the form

(outputTimestamp, state1:count1,state2:count2 ...)

or equivalent. These rows should contain, at any given time, the count of
the items in a given status, where the status associated to an Id is the
most recent message observed for that id. The status for an id should be
counted in any case, even if the event is way older than those getting
processed. So the sum of all the counts should be equal to the number of
different IDs observed in the system. The following step could be
forgetting about the items in a final item after a while, but this is not a
strict requirement right now.

This will be written on elasticsearch and then queried.

I tried many different paths and none of them completely satisfied the
requirement. Using a sliding window I could easily achieve the expected
behaviour, except that when the beginning of the sliding window surpassed
the timestamp of an event, it was lost for the count, as you may expect.
Others approaches failed to be consistent when working with a backlog
because I did some tricks with keys and timestamps that failed when the
data was processed all at once.

So I would like to know, even at an high level, how should I approach this
problem. It looks like a relatively common use-case but the fact that the
relevant information for a given ID must be retained indefinitely to count
the entities correctly creates a lot of problems.

Thank you in advance,

Simone

Re: Counting latest state of stateful entities in streaming

Posted by Fabian Hueske <fh...@gmail.com>.
This works with event-time as well. You need to set the right
TimeCharacteristics on the exec env and assign timestamps + watermarks. The
only time depended operation is the window. YourWindowFunction assigns the
timestamp of the window. WindowFunction.apply() has a TimeWindow parameter
that gives access to the window's start and end time. See docs [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation

2016-09-30 11:00 GMT+02:00 Simone Robutti <si...@radicalbit.io>:

> I'm working with your suggestions, thank you very much. What I'm missing
> here is what YourWindowFunction should do. I have no notion of event time
> there and so I can't assign a timestamp. Also this solution seems to be
> working by processing time, while I care about event time. I couldn't make
> it run yet but for what I got, this is slightly different from what I need.
>
> 2016-09-30 10:04 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
>
>> Hi Simone,
>>
>> I think I have a solution for your problem:
>>
>> val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)
>>
>> val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
>>   .keyBy(_._1) // key by id
>>   .flatMap(new StateUpdater) // StateUpdater is a stateful
>> FlatMapFunction. It has a keyed state that stores the last state of each
>> id. For each input record it returns two records: (oldState, -1),
>> (newState, +1)
>>
>> stateChanges ensures that counts of previous states are subtracted.
>>
>> val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
>> (state, cntUpdate, time)
>>   .keyBy(_._1) // key by state
>>   .window() // your window, should be non-overlapping, so go for instance
>> for Tumbling
>>   .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums
>> the cntUpdates and YourWindowFunction assigns the timestamp of your window
>>
>> this step aggregates all state changes for each state in a window
>>
>> val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count,
>> time)
>>   .keyBy(_._1) // key by state again
>>   .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has
>> a keyed state that stores the current count. For each incoming record, the
>> count is adjusted and a record (state, newCount, time) is emitted.
>>
>> Now you have the new counts for your states in multiple records. If
>> possible, you can update your Elasticsearch index using these. Otherwise,
>> you have to collect them into one record using another window.
>>
>> Also note, that the state size of this program depends on the number of
>> unique ids. That might cause problems if the id space grows very fast.
>>
>> Please let me know, if you have questions or if that works ;-)
>>
>> Cheers, Fabian
>>
>>
>> 2016-09-30 0:32 GMT+02:00 Simone Robutti <si...@radicalbit.io>:
>>
>>> Hello,
>>>
>>> in the last few days I tried to create my first real-time analytics job
>>> in Flink. The approach is kappa-architecture-like, so I have my raw data on
>>> Kafka where we receive a message for every change of state of any entity.
>>>
>>> So the messages are of the form
>>>
>>> (id,newStatus, timestamp)
>>>
>>> We want to compute, for every time window, the count of items in a given
>>> status. So the output should be of the form
>>>
>>> (outputTimestamp, state1:count1,state2:count2 ...)
>>>
>>> or equivalent. These rows should contain, at any given time, the count
>>> of the items in a given status, where the status associated to an Id is the
>>> most recent message observed for that id. The status for an id should be
>>> counted in any case, even if the event is way older than those getting
>>> processed. So the sum of all the counts should be equal to the number of
>>> different IDs observed in the system. The following step could be
>>> forgetting about the items in a final item after a while, but this is not a
>>> strict requirement right now.
>>>
>>> This will be written on elasticsearch and then queried.
>>>
>>> I tried many different paths and none of them completely satisfied the
>>> requirement. Using a sliding window I could easily achieve the expected
>>> behaviour, except that when the beginning of the sliding window surpassed
>>> the timestamp of an event, it was lost for the count, as you may expect.
>>> Others approaches failed to be consistent when working with a backlog
>>> because I did some tricks with keys and timestamps that failed when the
>>> data was processed all at once.
>>>
>>> So I would like to know, even at an high level, how should I approach
>>> this problem. It looks like a relatively common use-case but the fact that
>>> the relevant information for a given ID must be retained indefinitely to
>>> count the entities correctly creates a lot of problems.
>>>
>>> Thank you in advance,
>>>
>>> Simone
>>>
>>>
>>
>

Re: Counting latest state of stateful entities in streaming

Posted by Simone Robutti <si...@radicalbit.io>.
I'm working with your suggestions, thank you very much. What I'm missing
here is what YourWindowFunction should do. I have no notion of event time
there and so I can't assign a timestamp. Also this solution seems to be
working by processing time, while I care about event time. I couldn't make
it run yet but for what I got, this is slightly different from what I need.

2016-09-30 10:04 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> Hi Simone,
>
> I think I have a solution for your problem:
>
> val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)
>
> val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
>   .keyBy(_._1) // key by id
>   .flatMap(new StateUpdater) // StateUpdater is a stateful
> FlatMapFunction. It has a keyed state that stores the last state of each
> id. For each input record it returns two records: (oldState, -1),
> (newState, +1)
>
> stateChanges ensures that counts of previous states are subtracted.
>
> val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
> (state, cntUpdate, time)
>   .keyBy(_._1) // key by state
>   .window() // your window, should be non-overlapping, so go for instance
> for Tumbling
>   .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums
> the cntUpdates and YourWindowFunction assigns the timestamp of your window
>
> this step aggregates all state changes for each state in a window
>
> val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count,
> time)
>   .keyBy(_._1) // key by state again
>   .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has
> a keyed state that stores the current count. For each incoming record, the
> count is adjusted and a record (state, newCount, time) is emitted.
>
> Now you have the new counts for your states in multiple records. If
> possible, you can update your Elasticsearch index using these. Otherwise,
> you have to collect them into one record using another window.
>
> Also note, that the state size of this program depends on the number of
> unique ids. That might cause problems if the id space grows very fast.
>
> Please let me know, if you have questions or if that works ;-)
>
> Cheers, Fabian
>
>
> 2016-09-30 0:32 GMT+02:00 Simone Robutti <si...@radicalbit.io>:
>
>> Hello,
>>
>> in the last few days I tried to create my first real-time analytics job
>> in Flink. The approach is kappa-architecture-like, so I have my raw data on
>> Kafka where we receive a message for every change of state of any entity.
>>
>> So the messages are of the form
>>
>> (id,newStatus, timestamp)
>>
>> We want to compute, for every time window, the count of items in a given
>> status. So the output should be of the form
>>
>> (outputTimestamp, state1:count1,state2:count2 ...)
>>
>> or equivalent. These rows should contain, at any given time, the count of
>> the items in a given status, where the status associated to an Id is the
>> most recent message observed for that id. The status for an id should be
>> counted in any case, even if the event is way older than those getting
>> processed. So the sum of all the counts should be equal to the number of
>> different IDs observed in the system. The following step could be
>> forgetting about the items in a final item after a while, but this is not a
>> strict requirement right now.
>>
>> This will be written on elasticsearch and then queried.
>>
>> I tried many different paths and none of them completely satisfied the
>> requirement. Using a sliding window I could easily achieve the expected
>> behaviour, except that when the beginning of the sliding window surpassed
>> the timestamp of an event, it was lost for the count, as you may expect.
>> Others approaches failed to be consistent when working with a backlog
>> because I did some tricks with keys and timestamps that failed when the
>> data was processed all at once.
>>
>> So I would like to know, even at an high level, how should I approach
>> this problem. It looks like a relatively common use-case but the fact that
>> the relevant information for a given ID must be retained indefinitely to
>> count the entities correctly creates a lot of problems.
>>
>> Thank you in advance,
>>
>> Simone
>>
>>
>

Re: Counting latest state of stateful entities in streaming

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Simone,

I think I have a solution for your problem:

val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)

val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
  .keyBy(_._1) // key by id
  .flatMap(new StateUpdater) // StateUpdater is a stateful FlatMapFunction.
It has a keyed state that stores the last state of each id. For each input
record it returns two records: (oldState, -1), (newState, +1)

stateChanges ensures that counts of previous states are subtracted.

val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
(state, cntUpdate, time)
  .keyBy(_._1) // key by state
  .window() // your window, should be non-overlapping, so go for instance
for Tumbling
  .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums the
cntUpdates and YourWindowFunction assigns the timestamp of your window

this step aggregates all state changes for each state in a window

val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count, time)
  .keyBy(_._1) // key by state again
  .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has a
keyed state that stores the current count. For each incoming record, the
count is adjusted and a record (state, newCount, time) is emitted.

Now you have the new counts for your states in multiple records. If
possible, you can update your Elasticsearch index using these. Otherwise,
you have to collect them into one record using another window.

Also note, that the state size of this program depends on the number of
unique ids. That might cause problems if the id space grows very fast.

Please let me know, if you have questions or if that works ;-)

Cheers, Fabian

2016-09-30 0:32 GMT+02:00 Simone Robutti <si...@radicalbit.io>:

> Hello,
>
> in the last few days I tried to create my first real-time analytics job in
> Flink. The approach is kappa-architecture-like, so I have my raw data on
> Kafka where we receive a message for every change of state of any entity.
>
> So the messages are of the form
>
> (id,newStatus, timestamp)
>
> We want to compute, for every time window, the count of items in a given
> status. So the output should be of the form
>
> (outputTimestamp, state1:count1,state2:count2 ...)
>
> or equivalent. These rows should contain, at any given time, the count of
> the items in a given status, where the status associated to an Id is the
> most recent message observed for that id. The status for an id should be
> counted in any case, even if the event is way older than those getting
> processed. So the sum of all the counts should be equal to the number of
> different IDs observed in the system. The following step could be
> forgetting about the items in a final item after a while, but this is not a
> strict requirement right now.
>
> This will be written on elasticsearch and then queried.
>
> I tried many different paths and none of them completely satisfied the
> requirement. Using a sliding window I could easily achieve the expected
> behaviour, except that when the beginning of the sliding window surpassed
> the timestamp of an event, it was lost for the count, as you may expect.
> Others approaches failed to be consistent when working with a backlog
> because I did some tricks with keys and timestamps that failed when the
> data was processed all at once.
>
> So I would like to know, even at an high level, how should I approach this
> problem. It looks like a relatively common use-case but the fact that the
> relevant information for a given ID must be retained indefinitely to count
> the entities correctly creates a lot of problems.
>
> Thank you in advance,
>
> Simone
>
>