You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Simone Robutti <si...@radicalbit.io> on 2016/09/27 14:01:04 UTC

Merge the states of different partition in streaming

Hello,

I'm dealing with an analytical job in streaming and I don't know how to
write the last part.

Actually I want to count all the elements in a window with a given status,
so I keep a state with a Map[Status,Long]. This state is updated starting
from tuples containing the oldStatus and the newStatus. So every event
generates a +1 for the new status and a -1 for the old status. Then I want
to reduce all these counts and move from a local and partial state to a
global state that will be written in output.

Right now my code look like:

filteredLatestOrders.keyBy(x =>
x._1.getStatus).mapWithState(updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB))

where "filteredLatestOrder" is a DataStream containing informations about
the elements, the new state and the old state.

This produces in output:

2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0)
2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0)
2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1)
3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0)
4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1)

I thought that keying with a fixed value would collect all the elements in
a single node so that I could finally compute the final result, but they
are left on different nodes and are never summed.

Is this the correct approach? In that case, how can I do what I need? Is
there a smarter way to count distinct evolving elements by their status in
a streaming? Mind that the original source of events are updates to the
status of an element and the requirement is that I want to count only the
latest status available.

Thank you in advance,

Simone

Re: Merge the states of different partition in streaming

Posted by Ufuk Celebi <uc...@apache.org>.
Great to hear!

On Wed, Sep 28, 2016 at 5:18 PM, Simone Robutti
<si...@radicalbit.io> wrote:
> Solved. Probably there was an error in the way I was testing. Also I
> simplified the job and it works now.
>
> 2016-09-27 16:01 GMT+02:00 Simone Robutti <si...@radicalbit.io>:
>>
>> Hello,
>>
>> I'm dealing with an analytical job in streaming and I don't know how to
>> write the last part.
>>
>> Actually I want to count all the elements in a window with a given status,
>> so I keep a state with a Map[Status,Long]. This state is updated starting
>> from tuples containing the oldStatus and the newStatus. So every event
>> generates a +1 for the new status and a -1 for the old status. Then I want
>> to reduce all these counts and move from a local and partial state to a
>> global state that will be written in output.
>>
>> Right now my code look like:
>>
>> filteredLatestOrders.keyBy(x =>
>> x._1.getStatus).mapWithState(updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB))
>>
>> where "filteredLatestOrder" is a DataStream containing informations about
>> the elements, the new state and the old state.
>>
>> This produces in output:
>>
>> 2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0)
>> 2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0)
>> 2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1)
>> 3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0)
>> 4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1)
>>
>> I thought that keying with a fixed value would collect all the elements in
>> a single node so that I could finally compute the final result, but they are
>> left on different nodes and are never summed.
>>
>> Is this the correct approach? In that case, how can I do what I need? Is
>> there a smarter way to count distinct evolving elements by their status in a
>> streaming? Mind that the original source of events are updates to the status
>> of an element and the requirement is that I want to count only the latest
>> status available.
>>
>> Thank you in advance,
>>
>> Simone
>
>

Re: Merge the states of different partition in streaming

Posted by Simone Robutti <si...@radicalbit.io>.
Solved. Probably there was an error in the way I was testing. Also I
simplified the job and it works now.

2016-09-27 16:01 GMT+02:00 Simone Robutti <si...@radicalbit.io>:

> Hello,
>
> I'm dealing with an analytical job in streaming and I don't know how to
> write the last part.
>
> Actually I want to count all the elements in a window with a given status,
> so I keep a state with a Map[Status,Long]. This state is updated starting
> from tuples containing the oldStatus and the newStatus. So every event
> generates a +1 for the new status and a -1 for the old status. Then I want
> to reduce all these counts and move from a local and partial state to a
> global state that will be written in output.
>
> Right now my code look like:
>
> filteredLatestOrders.keyBy(x => x._1.getStatus).mapWithState(
> updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB))
>
> where "filteredLatestOrder" is a DataStream containing informations about
> the elements, the new state and the old state.
>
> This produces in output:
>
> 2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0)
> 2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0)
> 2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1)
> 3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0)
> 4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1)
>
> I thought that keying with a fixed value would collect all the elements in
> a single node so that I could finally compute the final result, but they
> are left on different nodes and are never summed.
>
> Is this the correct approach? In that case, how can I do what I need? Is
> there a smarter way to count distinct evolving elements by their status in
> a streaming? Mind that the original source of events are updates to the
> status of an element and the requirement is that I want to count only the
> latest status available.
>
> Thank you in advance,
>
> Simone
>