You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by bastien dine <ba...@gmail.com> on 2018/12/12 10:21:10 UTC

Question about key group / key state & parallelism

Hello everyone,

I have a question regarding the key state & parallelism of a process
operation

Doc says : "You can think of Keyed State as Operator State that has been
partitioned, or sharded, with exactly one state-partition per key. Each
keyed-state is logically bound to a unique composite of
<parallel-operator-instance, key>, and since each key “belongs” to exactly
one parallel instance of a keyed operator, we can think of this simply as
<operator, key>."

If I have less parallel operator instance (say 5) than my number of
possible key (10), it means than every instance will "manage" 2 key state ?
(is this spread evenly ?)
Is the logical bound fixed ? I mean, are the state always managed by the
same instance, or does this depends on the available instance at the moment
?

"During execution each parallel instance of a keyed operator works with the
keys for one or more Key Groups."
-> this is related, does "works with the keys" means always the same keys ?

Best Regards,
Bastien

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io

Re: Question about key group / key state & parallelism

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Bastien,

You are right, it will wait for message A to be processed. To be more
generic, it is a question of how to solve the data skew problem in shuffle
case. This question is common and there are already many ways to solve it
according to the different scenario.
I think we can solve your problem in the following ways:
- Define your own hash logic according to your business logic. For example,
making A and B contains a different hash value.
- Increase the maximum parallelism. There are exactly as many Key Groups as
the defined maximum parallelism. The more parallelism, the more key groups.
This reduces the probability that A&B in the same key group, i.e, reduce
the probability that in the same instance.

Best, Hequn



On Wed, Dec 12, 2018 at 10:33 PM bastien dine <ba...@gmail.com>
wrote:

> Hi Hequn, thanks for your response !
>
> Ok, that's what I was thinking about the key & operator instance
> If the affectation of key group to an instance is deterministic (and the
> hash of the key to belong to a key group) I have the following problem
>
> Let's say I have 4 key (A,B,C,D) & 2 parallel instance for my operator (1,
> 2).
> Flink determines that A/B belong 1 and C/D belong to 2.
> If I have a message keyed by A it will be processed by 1.
> But the following message is a B-key, it will wait for message A to be
> processed by 1 and then go to 1, even if 2 is not busy and can technically
> do the processing, right ?
>
> How can I deal with that ?
>
> Best Regard and many thanks !
> Bastien
> ------------------
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le mer. 12 déc. 2018 à 13:39, Hequn Cheng <ch...@gmail.com> a écrit :
>
>> Hi Bastien,
>>
>> Each key “belongs” to exactly one parallel instance of a keyed operator,
>> and each parallel instance contains one or more Key Groups.
>> Keys will be hashed into the corresponding key group deterministically.
>> It is hashed by the value instead of the number of the total records.
>> Different keys do not affect each other even a parallel instance contains
>> one or more Key Groups.
>>
>> Best, Hequn
>>
>>
>> On Wed, Dec 12, 2018 at 6:21 PM bastien dine <ba...@gmail.com>
>> wrote:
>>
>>> Hello everyone,
>>>
>>> I have a question regarding the key state & parallelism of a process
>>> operation
>>>
>>> Doc says : "You can think of Keyed State as Operator State that has been
>>> partitioned, or sharded, with exactly one state-partition per key. Each
>>> keyed-state is logically bound to a unique composite of
>>> <parallel-operator-instance, key>, and since each key “belongs” to exactly
>>> one parallel instance of a keyed operator, we can think of this simply as
>>> <operator, key>."
>>>
>>> If I have less parallel operator instance (say 5) than my number of
>>> possible key (10), it means than every instance will "manage" 2 key state ?
>>> (is this spread evenly ?)
>>> Is the logical bound fixed ? I mean, are the state always managed by the
>>> same instance, or does this depends on the available instance at the moment
>>> ?
>>>
>>> "During execution each parallel instance of a keyed operator works with
>>> the keys for one or more Key Groups."
>>> -> this is related, does "works with the keys" means always the same
>>> keys ?
>>>
>>> Best Regards,
>>> Bastien
>>>
>>> ------------------
>>>
>>> Bastien DINE
>>> Data Architect / Software Engineer / Sysadmin
>>> bastiendine.io
>>>
>>

Re: Question about key group / key state & parallelism

Posted by Ken Krugler <kk...@transpac.com>.
If you don’t care about having the same key processed by the same operator (so no keyed state, no windowed processing of the same keys, etc) then you can just use DataStream.rebalance() <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning>.

— Ken


> On Dec 12, 2018, at 6:33 AM, bastien dine <ba...@gmail.com> wrote:
> 
> Hi Hequn, thanks for your response !
> 
> Ok, that's what I was thinking about the key & operator instance
> If the affectation of key group to an instance is deterministic (and the hash of the key to belong to a key group) I have the following problem
> 
> Let's say I have 4 key (A,B,C,D) & 2 parallel instance for my operator (1, 2).
> Flink determines that A/B belong 1 and C/D belong to 2.
> If I have a message keyed by A it will be processed by 1.
> But the following message is a B-key, it will wait for message A to be processed by 1 and then go to 1, even if 2 is not busy and can technically do the processing, right ? 
> 
> How can I deal with that ?
> 
> Best Regard and many thanks !
> Bastien
> ------------------
> 
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io <http://bastiendine.io/>
> 
> 
> Le mer. 12 déc. 2018 à 13:39, Hequn Cheng <chenghequn@gmail.com <ma...@gmail.com>> a écrit :
> Hi Bastien,
> 
> Each key “belongs” to exactly one parallel instance of a keyed operator, and each parallel instance contains one or more Key Groups. 
> Keys will be hashed into the corresponding key group deterministically. It is hashed by the value instead of the number of the total records.
> Different keys do not affect each other even a parallel instance contains one or more Key Groups.
> 
> Best, Hequn
> 
> 
> On Wed, Dec 12, 2018 at 6:21 PM bastien dine <bastien.dine@gmail.com <ma...@gmail.com>> wrote:
> Hello everyone,
> 
> I have a question regarding the key state & parallelism of a process operation
> 
> Doc says : "You can think of Keyed State as Operator State that has been partitioned, or sharded, with exactly one state-partition per key. Each keyed-state is logically bound to a unique composite of <parallel-operator-instance, key>, and since each key “belongs” to exactly one parallel instance of a keyed operator, we can think of this simply as <operator, key>."
> 
> If I have less parallel operator instance (say 5) than my number of possible key (10), it means than every instance will "manage" 2 key state ? (is this spread evenly ?)
> Is the logical bound fixed ? I mean, are the state always managed by the same instance, or does this depends on the available instance at the moment ? 
> 
> "During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups."
> -> this is related, does "works with the keys" means always the same keys ?
> 
> Best Regards,
> Bastien
> 
> ------------------
> 
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io <http://bastiendine.io/>

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Re: Question about key group / key state & parallelism

Posted by bastien dine <ba...@gmail.com>.
Hi Hequn, thanks for your response !

Ok, that's what I was thinking about the key & operator instance
If the affectation of key group to an instance is deterministic (and the
hash of the key to belong to a key group) I have the following problem

Let's say I have 4 key (A,B,C,D) & 2 parallel instance for my operator (1,
2).
Flink determines that A/B belong 1 and C/D belong to 2.
If I have a message keyed by A it will be processed by 1.
But the following message is a B-key, it will wait for message A to be
processed by 1 and then go to 1, even if 2 is not busy and can technically
do the processing, right ?

How can I deal with that ?

Best Regard and many thanks !
Bastien
------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 12 déc. 2018 à 13:39, Hequn Cheng <ch...@gmail.com> a écrit :

> Hi Bastien,
>
> Each key “belongs” to exactly one parallel instance of a keyed operator,
> and each parallel instance contains one or more Key Groups.
> Keys will be hashed into the corresponding key group deterministically. It
> is hashed by the value instead of the number of the total records.
> Different keys do not affect each other even a parallel instance contains
> one or more Key Groups.
>
> Best, Hequn
>
>
> On Wed, Dec 12, 2018 at 6:21 PM bastien dine <ba...@gmail.com>
> wrote:
>
>> Hello everyone,
>>
>> I have a question regarding the key state & parallelism of a process
>> operation
>>
>> Doc says : "You can think of Keyed State as Operator State that has been
>> partitioned, or sharded, with exactly one state-partition per key. Each
>> keyed-state is logically bound to a unique composite of
>> <parallel-operator-instance, key>, and since each key “belongs” to exactly
>> one parallel instance of a keyed operator, we can think of this simply as
>> <operator, key>."
>>
>> If I have less parallel operator instance (say 5) than my number of
>> possible key (10), it means than every instance will "manage" 2 key state ?
>> (is this spread evenly ?)
>> Is the logical bound fixed ? I mean, are the state always managed by the
>> same instance, or does this depends on the available instance at the moment
>> ?
>>
>> "During execution each parallel instance of a keyed operator works with
>> the keys for one or more Key Groups."
>> -> this is related, does "works with the keys" means always the same keys
>> ?
>>
>> Best Regards,
>> Bastien
>>
>> ------------------
>>
>> Bastien DINE
>> Data Architect / Software Engineer / Sysadmin
>> bastiendine.io
>>
>

Re: Question about key group / key state & parallelism

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Bastien,

Each key “belongs” to exactly one parallel instance of a keyed operator,
and each parallel instance contains one or more Key Groups.
Keys will be hashed into the corresponding key group deterministically. It
is hashed by the value instead of the number of the total records.
Different keys do not affect each other even a parallel instance contains
one or more Key Groups.

Best, Hequn


On Wed, Dec 12, 2018 at 6:21 PM bastien dine <ba...@gmail.com> wrote:

> Hello everyone,
>
> I have a question regarding the key state & parallelism of a process
> operation
>
> Doc says : "You can think of Keyed State as Operator State that has been
> partitioned, or sharded, with exactly one state-partition per key. Each
> keyed-state is logically bound to a unique composite of
> <parallel-operator-instance, key>, and since each key “belongs” to exactly
> one parallel instance of a keyed operator, we can think of this simply as
> <operator, key>."
>
> If I have less parallel operator instance (say 5) than my number of
> possible key (10), it means than every instance will "manage" 2 key state ?
> (is this spread evenly ?)
> Is the logical bound fixed ? I mean, are the state always managed by the
> same instance, or does this depends on the available instance at the moment
> ?
>
> "During execution each parallel instance of a keyed operator works with
> the keys for one or more Key Groups."
> -> this is related, does "works with the keys" means always the same keys ?
>
> Best Regards,
> Bastien
>
> ------------------
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>