You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dmitry Golubets <dg...@gmail.com> on 2017/02/13 18:30:15 UTC

A way to control redistribution of operator state?

Hi,

It looks impossible to implement a keyed state with operator state now.

I know it sounds like "just use a keyed state", but latter requires
updating it on every value change as opposed to operator state and thus can
be expensive (especially if you have to deal with mutable structures inside
which have to be serialized).

The problem is that there is no way to tell Flink how to reassign savepoint
parts between partitions, and thus impossible to route data to correct
partitions.

Is there anything I missed or maybe a plan to implement it in future?

Best regards,
Dmitry

Re: A way to control redistribution of operator state?

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I think the clean solution would be using raw keyed state once it becomes available. For the meantime, your solution could work. However, you should be aware that your approach does not rely on a contract but an implementation detail that *could* change between versions and break your code in subtle ways.

Best,
Stefan

> Am 14.02.2017 um 12:19 schrieb Dmitry Golubets <dg...@gmail.com>:
> 
> Hi,
> 
> I was playing with it more today and I think I've found a workaround.
> 
> So what I do:
> 1. I define a constant N logical groups
> 2. I use consistent hash mapping of data keys to these groups
> 3. I map these groups to partitions using even distribution (same as Flink distributes state)
> 4. In a stateful function I'm able to calculate wich groups are assigned to that partition and produce the right number of states for each groups (empty states too)
> 5. I do manual partitioning before that stateful function using same calculations with groups
> 
> So far it looks like scaling up and down results in correct behavior.
> Can I rely on Flink distributing state evenly and in the order I return it in the list?
> 
> Best regards,
> Dmitry
> 
> On Tue, Feb 14, 2017 at 9:33 AM, Stefan Richter <s.richter@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> there is something that we call "raw keyed“ operator state, which might exactly serve your purpose. It is already used internally by Flink’s window operator, but there exists currently no public API for this feature. Way it works currently is that you obtain input and output streams that are aware of key-groups being written or read, but the API needs to consider the fact that each key-group must be written only once and complete before the next key-group can start. This is a bit tricky to expose for inheritance hierarchies. My guess is that you can expect this for the next version of Flink.
> 
> Best,
> Stefan
> 
>> Am 14.02.2017 um 08:31 schrieb Tzu-Li (Gordon) Tai <tzulitai@apache.org <ma...@apache.org>>:
>> 
>> Hi Dmitry,
>> 
>> Technically, from the looks of the internal code around `OperatorStateRepartitioner`, I think it is certainly possible to be pluggable.
>> Right now it is just hard coded to use a round-robin repartitioner implementation as default.
>> 
>> However, I’m not sure of the plans in exposing this to the user and making it configurable.
>> Looping in Stefan (in cc) who mostly worked on this part and see if he can provide more info.
>> 
>> - Gordon
>> 
>> On February 14, 2017 at 2:30:27 AM, Dmitry Golubets (dgolubets@gmail.com <ma...@gmail.com>) wrote:
>> 
>>> Hi,
>>> 
>>> It looks impossible to implement a keyed state with operator state now.
>>> 
>>> I know it sounds like "just use a keyed state", but latter requires updating it on every value change as opposed to operator state and thus can be expensive (especially if you have to deal with mutable structures inside which have to be serialized).
>>> 
>>> The problem is that there is no way to tell Flink how to reassign savepoint parts between partitions, and thus impossible to route data to correct partitions.
>>> 
>>> Is there anything I missed or maybe a plan to implement it in future?
>>> 
>>> Best regards,
>>> Dmitry
> 
> 


Re: A way to control redistribution of operator state?

Posted by Dmitry Golubets <dg...@gmail.com>.
Hi,

I was playing with it more today and I think I've found a workaround.

So what I do:
1. I define a constant N logical groups
2. I use consistent hash mapping of data keys to these groups
3. I map these groups to partitions using even distribution (same as Flink
distributes state)
4. In a stateful function I'm able to calculate wich groups are assigned to
that partition and produce the right number of states for each groups
(empty states too)
5. I do manual partitioning before that stateful function using same
calculations with groups

So far it looks like scaling up and down results in correct behavior.
Can I rely on Flink distributing state evenly and in the order I return it
in the list?

Best regards,
Dmitry

On Tue, Feb 14, 2017 at 9:33 AM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> there is something that we call "raw keyed“ operator state, which might
> exactly serve your purpose. It is already used internally by Flink’s window
> operator, but there exists currently no public API for this feature. Way it
> works currently is that you obtain input and output streams that are aware
> of key-groups being written or read, but the API needs to consider the fact
> that each key-group must be written only once and complete before the next
> key-group can start. This is a bit tricky to expose for inheritance
> hierarchies. My guess is that you can expect this for the next version of
> Flink.
>
> Best,
> Stefan
>
> Am 14.02.2017 um 08:31 schrieb Tzu-Li (Gordon) Tai <tz...@apache.org>:
>
> Hi Dmitry,
>
> Technically, from the looks of the internal code around
> `OperatorStateRepartitioner`, I think it is certainly possible to be
> pluggable.
> Right now it is just hard coded to use a round-robin repartitioner
> implementation as default.
>
> However, I’m not sure of the plans in exposing this to the user and making
> it configurable.
> Looping in Stefan (in cc) who mostly worked on this part and see if he can
> provide more info.
>
> - Gordon
>
> On February 14, 2017 at 2:30:27 AM, Dmitry Golubets (dgolubets@gmail.com)
> wrote:
>
> Hi,
>
> It looks impossible to implement a keyed state with operator state now.
>
> I know it sounds like "just use a keyed state", but latter requires
> updating it on every value change as opposed to operator state and thus can
> be expensive (especially if you have to deal with mutable structures inside
> which have to be serialized).
>
> The problem is that there is no way to tell Flink how to reassign
> savepoint parts between partitions, and thus impossible to route data to
> correct partitions.
>
> Is there anything I missed or maybe a plan to implement it in future?
>
> Best regards,
> Dmitry
>
>
>

Re: A way to control redistribution of operator state?

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

there is something that we call "raw keyed“ operator state, which might exactly serve your purpose. It is already used internally by Flink’s window operator, but there exists currently no public API for this feature. Way it works currently is that you obtain input and output streams that are aware of key-groups being written or read, but the API needs to consider the fact that each key-group must be written only once and complete before the next key-group can start. This is a bit tricky to expose for inheritance hierarchies. My guess is that you can expect this for the next version of Flink.

Best,
Stefan

> Am 14.02.2017 um 08:31 schrieb Tzu-Li (Gordon) Tai <tz...@apache.org>:
> 
> Hi Dmitry,
> 
> Technically, from the looks of the internal code around `OperatorStateRepartitioner`, I think it is certainly possible to be pluggable.
> Right now it is just hard coded to use a round-robin repartitioner implementation as default.
> 
> However, I’m not sure of the plans in exposing this to the user and making it configurable.
> Looping in Stefan (in cc) who mostly worked on this part and see if he can provide more info.
> 
> - Gordon
> 
> On February 14, 2017 at 2:30:27 AM, Dmitry Golubets (dgolubets@gmail.com <ma...@gmail.com>) wrote:
> 
>> Hi,
>> 
>> It looks impossible to implement a keyed state with operator state now.
>> 
>> I know it sounds like "just use a keyed state", but latter requires updating it on every value change as opposed to operator state and thus can be expensive (especially if you have to deal with mutable structures inside which have to be serialized).
>> 
>> The problem is that there is no way to tell Flink how to reassign savepoint parts between partitions, and thus impossible to route data to correct partitions.
>> 
>> Is there anything I missed or maybe a plan to implement it in future?
>> 
>> Best regards,
>> Dmitry


Re: A way to control redistribution of operator state?

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Dmitry,

Technically, from the looks of the internal code around `OperatorStateRepartitioner`, I think it is certainly possible to be pluggable.
Right now it is just hard coded to use a round-robin repartitioner implementation as default.

However, I’m not sure of the plans in exposing this to the user and making it configurable.
Looping in Stefan (in cc) who mostly worked on this part and see if he can provide more info.

- Gordon

On February 14, 2017 at 2:30:27 AM, Dmitry Golubets (dgolubets@gmail.com) wrote:

Hi,

It looks impossible to implement a keyed state with operator state now.

I know it sounds like "just use a keyed state", but latter requires updating it on every value change as opposed to operator state and thus can be expensive (especially if you have to deal with mutable structures inside which have to be serialized).

The problem is that there is no way to tell Flink how to reassign savepoint parts between partitions, and thus impossible to route data to correct partitions.

Is there anything I missed or maybe a plan to implement it in future?

Best regards,
Dmitry