You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by gerardg <ge...@talaia.io> on 2018/02/20 12:44:46 UTC

Get which key groups are assigned to an operator

Hello,

To improve performance we have " keyed state" in the operator's memory,
basically we keep a Map which contains the state per each of the keys. The
problem comes when we want to restore the state after a failure or after
rescaling the operator. What we are doing is sending the concatenation of
all the state to every operator using an union redistribution and then we
restore the "in memory state" every time we see a new key. Then, after a
while, we just clear the redistributed state. This is somewhat complex and
prone to errors so we would like to find an alternative way of doing this.

As far as I know Flink knows which keys belong to each operator
(distributing key groups) so I guess it would be possible to somehow
calculate the key id from each of the stored keys and restore the in memory
state at once if we could access to the key groups mapping. Is that
possible? We could patch Flink if necessary to access that information. 

Thanks, 

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Get which key groups are assigned to an operator

Posted by Gerard Garcia <ge...@talaia.io>.
You are right that probably the best solution would be to be able to use
different state backends for different operators, I hope it gets
implemented at some point. Meanwhile I'll take a look at the methods in
org.apache.flink.runtime.state.KeyGroupRangeAssignment, maybe I can find a
workaround good enough for me.

Thanks,

Gerard

On Tue, Feb 20, 2018 at 3:56 PM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> ok, now I understand your goal a bit better. If would still like to point
> out that it may take a bit more than it looks like. Just to name one
> example, you probably also want to support asynchronous snapshots which is
> most likely difficult when using just a hashmap. I think the proper
> solution for you (and also something that we are considering to support in
> the future) is that different backends could be supported for different
> operators in a job. But that is currently not possible. I still want to
> answer your other question: you could currently compute all things about
> key-groups and their assignment to operators by using the methods
> from org.apache.flink.runtime.state.KeyGroupRangeAssignment.
>
> Best,
> Stefan
>
>
> Am 20.02.2018 um 14:52 schrieb Gerard Garcia <ge...@talaia.io>:
>
> Hi Stefan, thanks
>
> Yes, we are also using keyed state in other operators the problem is that
> serialization is quite expensive and in some of them we would prefer to
> avoid it by storing the state in memory (for our use case one specific
> operator with in memory state gives at least a 30% throughput improvement).
> When we are not operating in a keyed stream is easy, basically all the
> operators have the same in memory state, what we would like to do is the
> same but when we are operating in a keyed stream. Does it make more sense
> now?
>
> We are using rocksdb as state backend and as far as I know elements get
> always serialized when stored in the state and I'm not sure if there is
> even some disk access (maybe not synchronously) that could hurt performance.
>
> Gerard
>
> On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter <
> s.richter@data-artisans.com> wrote:
>
>> Hi,
>>
>> from what I read, I get the impression that you attempt to implement you
>> own "keyed state" with a hashmap? Why not using the keyed state that is
>> already provided by Flink and gives you efficient rescaling etc. out of the
>> box? Please see [1] for the details.
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/stream/state/state.html#using-managed-keyed-state
>>
>> Am 20.02.2018 um 13:44 schrieb gerardg <ge...@talaia.io>:
>>
>> Hello,
>>
>> To improve performance we have " keyed state" in the operator's memory,
>> basically we keep a Map which contains the state per each of the keys. The
>> problem comes when we want to restore the state after a failure or after
>> rescaling the operator. What we are doing is sending the concatenation of
>> all the state to every operator using an union redistribution and then we
>> restore the "in memory state" every time we see a new key. Then, after a
>> while, we just clear the redistributed state. This is somewhat complex and
>> prone to errors so we would like to find an alternative way of doing this.
>>
>> As far as I know Flink knows which keys belong to each operator
>> (distributing key groups) so I guess it would be possible to somehow
>> calculate the key id from each of the stored keys and restore the in
>> memory
>> state at once if we could access to the key groups mapping. Is that
>> possible? We could patch Flink if necessary to access that information.
>>
>> Thanks,
>>
>> Gerard
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>>
>>
>
>

Re: Get which key groups are assigned to an operator

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

ok, now I understand your goal a bit better. If would still like to point out that it may take a bit more than it looks like. Just to name one example, you probably also want to support asynchronous snapshots which is most likely difficult when using just a hashmap. I think the proper solution for you (and also something that we are considering to support in the future) is that different backends could be supported for different operators in a job. But that is currently not possible. I still want to answer your other question: you could currently compute all things about key-groups and their assignment to operators by using the methods from org.apache.flink.runtime.state.KeyGroupRangeAssignment.

Best,
Stefan

> Am 20.02.2018 um 14:52 schrieb Gerard Garcia <ge...@talaia.io>:
> 
> Hi Stefan, thanks 
> 
> Yes, we are also using keyed state in other operators the problem is that serialization is quite expensive and in some of them we would prefer to avoid it by storing the state in memory (for our use case one specific operator with in memory state gives at least a 30% throughput improvement). When we are not operating in a keyed stream is easy, basically all the operators have the same in memory state, what we would like to do is the same but when we are operating in a keyed stream. Does it make more sense now?
> 
> We are using rocksdb as state backend and as far as I know elements get always serialized when stored in the state and I'm not sure if there is even some disk access (maybe not synchronously) that could hurt performance.
> 
> Gerard
> 
> On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> from what I read, I get the impression that you attempt to implement you own "keyed state" with a hashmap? Why not using the keyed state that is already provided by Flink and gives you efficient rescaling etc. out of the box? Please see [1] for the details.
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state>
> 
>> Am 20.02.2018 um 13:44 schrieb gerardg <gerard@talaia.io <ma...@talaia.io>>:
>> 
>> Hello,
>> 
>> To improve performance we have " keyed state" in the operator's memory,
>> basically we keep a Map which contains the state per each of the keys. The
>> problem comes when we want to restore the state after a failure or after
>> rescaling the operator. What we are doing is sending the concatenation of
>> all the state to every operator using an union redistribution and then we
>> restore the "in memory state" every time we see a new key. Then, after a
>> while, we just clear the redistributed state. This is somewhat complex and
>> prone to errors so we would like to find an alternative way of doing this.
>> 
>> As far as I know Flink knows which keys belong to each operator
>> (distributing key groups) so I guess it would be possible to somehow
>> calculate the key id from each of the stored keys and restore the in memory
>> state at once if we could access to the key groups mapping. Is that
>> possible? We could patch Flink if necessary to access that information. 
>> 
>> Thanks, 
>> 
>> Gerard
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 
> 


Re: Get which key groups are assigned to an operator

Posted by Gerard Garcia <ge...@talaia.io>.
Hi Stefan, thanks

Yes, we are also using keyed state in other operators the problem is that
serialization is quite expensive and in some of them we would prefer to
avoid it by storing the state in memory (for our use case one specific
operator with in memory state gives at least a 30% throughput improvement).
When we are not operating in a keyed stream is easy, basically all the
operators have the same in memory state, what we would like to do is the
same but when we are operating in a keyed stream. Does it make more sense
now?

We are using rocksdb as state backend and as far as I know elements get
always serialized when stored in the state and I'm not sure if there is
even some disk access (maybe not synchronously) that could hurt performance.

Gerard

On Tue, Feb 20, 2018 at 2:42 PM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> from what I read, I get the impression that you attempt to implement you
> own "keyed state" with a hashmap? Why not using the keyed state that is
> already provided by Flink and gives you efficient rescaling etc. out of the
> box? Please see [1] for the details.
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/stream/state/state.html#using-managed-keyed-state
>
> Am 20.02.2018 um 13:44 schrieb gerardg <ge...@talaia.io>:
>
> Hello,
>
> To improve performance we have " keyed state" in the operator's memory,
> basically we keep a Map which contains the state per each of the keys. The
> problem comes when we want to restore the state after a failure or after
> rescaling the operator. What we are doing is sending the concatenation of
> all the state to every operator using an union redistribution and then we
> restore the "in memory state" every time we see a new key. Then, after a
> while, we just clear the redistributed state. This is somewhat complex and
> prone to errors so we would like to find an alternative way of doing this.
>
> As far as I know Flink knows which keys belong to each operator
> (distributing key groups) so I guess it would be possible to somehow
> calculate the key id from each of the stored keys and restore the in memory
> state at once if we could access to the key groups mapping. Is that
> possible? We could patch Flink if necessary to access that information.
>
> Thanks,
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>
>
>

Re: Get which key groups are assigned to an operator

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

from what I read, I get the impression that you attempt to implement you own "keyed state" with a hashmap? Why not using the keyed state that is already provided by Flink and gives you efficient rescaling etc. out of the box? Please see [1] for the details.

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#using-managed-keyed-state>

> Am 20.02.2018 um 13:44 schrieb gerardg <ge...@talaia.io>:
> 
> Hello,
> 
> To improve performance we have " keyed state" in the operator's memory,
> basically we keep a Map which contains the state per each of the keys. The
> problem comes when we want to restore the state after a failure or after
> rescaling the operator. What we are doing is sending the concatenation of
> all the state to every operator using an union redistribution and then we
> restore the "in memory state" every time we see a new key. Then, after a
> while, we just clear the redistributed state. This is somewhat complex and
> prone to errors so we would like to find an alternative way of doing this.
> 
> As far as I know Flink knows which keys belong to each operator
> (distributing key groups) so I guess it would be possible to somehow
> calculate the key id from each of the stored keys and restore the in memory
> state at once if we could access to the key groups mapping. Is that
> possible? We could patch Flink if necessary to access that information. 
> 
> Thanks, 
> 
> Gerard
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/