You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Christophe Jolif <cj...@gmail.com> on 2018/04/11 21:16:49 UTC

keyBy and parallelism

Hi all,

Imagine I have a default parallelism of 16 and I do something like

stream.keyBy("something").flatMap()

Now let's imagine I have less than 16 keys, maybe 8.

How many parallel executions of the flatMap function will I get? 8 because
I have 8 keys, or 16 because I have default parallelism at 16?

(and I will have follow up questions depending on the answer I suspect ;))

Thanks,
-- 
Christophe

Re: keyBy and parallelism

Posted by Ken Krugler <kk...@transpac.com>.
I’m not sure I understand the actual use case, but …

Using a rebalance() to randomly distribute keys to operators is what I think you’d need to do to support “even if I have less keys that slots, I wants each slot to take his share in the work”

So it sounds like you want to (a) broadcast all rules (so every operator task has all of the rules), and then (b) randomly distribute the keys to the operator.

Then have a custom function that examines the keys to figure out what rule(s) to apply.

There are often timing issues here, where you have to buffer keys while waiting for all (to some definition of “all”) the rules to arrive before you start processing the keys.

— Ken

> On Apr 12, 2018, at 2:44 AM, Christophe Jolif <cj...@gmail.com> wrote:
> 
> Sihua,
> 
> On Thu, Apr 12, 2018 at 10:04 AM, 周思华 <summerleafs@163.com <ma...@163.com>> wrote:
> Hi Christophe,
> I think what you want to do is "stream join", and I'm a bit confuse that if you have know there are only 8 keys   then why would you still like to use 16 parallelisms? 8 of them will be idle(a waste of CPU). In the KeyedStream, the tuples with the same key will be sent to the same parrallelism. 
> 
> 
> First my 8 keys, 16 parallelisms is just an example. Real life it is a bit more complicated. But basically the idea is that I have a certain number of task slots, and I want to get them busy so that my processing is as fast as possible. Even if I have less keys that slots, I wants each slot to take his share in the work.
>  
> 
> And I'm also a bit confuse about the pseudo code, it looks like you regard that the tuple with the same key in stream A will always arrive before the tuple in stream B? I think that can't be promised... you may need to store the tuple in stream B in case that tuple in stream B arrive before A, and do the "analysis logic" in both flatMap1() and flatMap2().
> 
> 
> You are right. I just wanted to focus on my issue which is :
> 
> 1/ having a co-processing that is considering only stuff of the same key and that can store in the key-state the "rules" (and as you said I might have to store other things for ordering reasons)
> 2/ but being able to parallelism a given key to use as much parallelism as my cluster allow me to do so.
> 
> 
> Regards,
> Sihua Zhou
> 
> On 04/12/2018 15:44,Christophe Jolif<cj...@gmail.com> <ma...@gmail.com> wrote:
> Thanks Chesnay (and others).
> 
> That's what I was figuring out. Now let's go onto the follow up with my exact use-case.
> 
> I have two streams A and B. A basically receives "rules" that the processing of B should observe to process.
> 
> There is a "key" that allows me to know that a rule x coming in A is for events with the same key coming in B.
> 
> I was planning to do (pseudo code):
> 
> A.connect(B).keyBy("thekey").flatMap(
>    flatMap1()
>       -> store in a ValueState the rule 
>    flatMap2()
>       -> use the state to get the rule, transform the element according to the rule, collect it
> )
> 
> 
> I think it should work, right, because the ValueState will be "per key" and contain the rule for this key and so on?
> 
> Now, what I really care is not having all the elements of key1 in the same parallelism, I just want to make sure key1 and key2 are isolated so I can use the key state to store the corresponding rule and key2 rules are not used for key1 and conversely.
> 
> So ideally instead of using 8 parallelisms, in order to use the full power of my system, even with 8 keys I would like to use 16 parallelisms as I don't care about all elements of key1 being in the same parallelism. All I care is that the state contain the rule corresponding to this key.
> 
> What would be the recommended approach here?
> 
> Thanks again for your help,
> --
> Christophe
> 
> 
> On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler <chesnay@apache.org <ma...@apache.org>> wrote:
> You will get 16 parallel executions since you specify a parallellism of 16, however 8 of these will not get any data.
> 
> 
> On 11.04.2018 23:29, Hao Sun wrote:
>> From what I learnt, you have to control parallelism your self. You can set parallelism on operator or set default one through flink-config.yaml.
>> I might be wrong.
>> 
>> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif <cjolif@gmail.com <ma...@gmail.com>> wrote:
>> Hi all,
>> 
>> Imagine I have a default parallelism of 16 and I do something like
>> 
>> stream.keyBy("something").flatMap()
>> 
>> Now let's imagine I have less than 16 keys, maybe 8.
>> 
>> How many parallel executions of the flatMap function will I get? 8 because I have 8 keys, or 16 because I have default parallelism at 16?
>> 
>> (and I will have follow up questions depending on the answer I suspect ;))
>> 
>> Thanks,
>> -- 
>> Christophe
> 
> 
> 
> 

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378


Re: keyBy and parallelism

Posted by Christophe Jolif <cj...@gmail.com>.
Sihua,

On Thu, Apr 12, 2018 at 10:04 AM, 周思华 <su...@163.com> wrote:

> Hi Christophe,
> I think what you want to do is "stream join", and I'm a bit confuse that
> if you have know there are only 8 keys   then why would you still like to
> use 16 parallelisms? 8 of them will be idle(a waste of CPU). In the
> KeyedStream, the tuples with the same key will be sent to the same
> parrallelism.
>


First my 8 keys, 16 parallelisms is just an example. Real life it is a bit
more complicated. But basically the idea is that I have a certain number of
task slots, and I want to get them busy so that my processing is as fast as
possible. Even if I have less keys that slots, I wants each slot to take
his share in the work.


>
> And I'm also a bit confuse about the pseudo code, it looks like you regard
> that the tuple with the same key in stream A will always arrive before the
> tuple in stream B? I think that can't be promised... you may need to store
> the tuple in stream B in case that tuple in stream B arrive before A, and
> do the "analysis logic" in both flatMap1() and flatMap2().
>


You are right. I just wanted to focus on my issue which is :

1/ having a co-processing that is considering only stuff of the same key
and that can store in the key-state the "rules" (and as you said I might
have to store other things for ordering reasons)
2/ but being able to parallelism a given key to use as much parallelism as
my cluster allow me to do so.


Regards,
> Sihua Zhou
>
> On 04/12/2018 15:44,Christophe Jolif<cj...@gmail.com> <cj...@gmail.com>
> wrote:
>
> Thanks Chesnay (and others).
>
> That's what I was figuring out. Now let's go onto the follow up with my
> exact use-case.
>
> I have two streams A and B. A basically receives "rules" that the
> processing of B should observe to process.
>
> There is a "key" that allows me to know that a rule x coming in A is for
> events with the same key coming in B.
>
> I was planning to do (pseudo code):
>
> A.connect(B).keyBy("thekey").flatMap(
>    flatMap1()
>       -> store in a ValueState the rule
>    flatMap2()
>       -> use the state to get the rule, transform the element according to
> the rule, collect it
> )
>
>
> I think it should work, right, because the ValueState will be "per key"
> and contain the rule for this key and so on?
>
> Now, what I really care is not having all the elements of key1 in the same
> parallelism, I just want to make sure key1 and key2 are isolated so I can
> use the key state to store the corresponding rule and key2 rules are not
> used for key1 and conversely.
>
> So ideally instead of using 8 parallelisms, in order to use the full
> power of my system, even with 8 keys I would like to use 16 parallelisms as
> I don't care about all elements of key1 being in the same parallelism. All
> I care is that the state contain the rule corresponding to this key.
>
> What would be the recommended approach here?
>
> Thanks again for your help,
> --
> Christophe
>
>
> On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> You will get 16 parallel executions since you specify a parallellism of
>> 16, however 8 of these will not get any data.
>>
>>
>> On 11.04.2018 23:29, Hao Sun wrote:
>>
>> From what I learnt, you have to control parallelism your self. You can
>> set parallelism on operator or set default one through flink-config.yaml.
>> I might be wrong.
>>
>> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif <cj...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> Imagine I have a default parallelism of 16 and I do something like
>>>
>>> stream.keyBy("something").flatMap()
>>>
>>> Now let's imagine I have less than 16 keys, maybe 8.
>>>
>>> How many parallel executions of the flatMap function will I get? 8
>>> because I have 8 keys, or 16 because I have default parallelism at 16?
>>>
>>> (and I will have follow up questions depending on the answer I suspect
>>> ;))
>>>
>>> Thanks,
>>> --
>>> Christophe
>>>
>>
>>

Re: keyBy and parallelism

Posted by 周思华 <su...@163.com>.
Hi Christophe,
I think what you want to do is "stream join", and I'm a bit confuse that if you have know there are only 8 keys   then why would you still like to use 16 parallelisms? 8 of them will be idle(a waste of CPU). In the KeyedStream, the tuples with the same key will be sent to the same parrallelism. 


And I'm also a bit confuse about the pseudo code, it looks like you regard that the tuple with the same key in stream A will always arrive before the tuple in stream B? I think that can't be promised... you may need to store the tuple in stream B in case that tuple in stream B arrive before A, and do the "analysis logic" in both flatMap1() and flatMap2().


Regards,
Sihua Zhou


On 04/12/2018 15:44,Christophe Jolif<cj...@gmail.com> wrote:
Thanks Chesnay (and others).


That's what I was figuring out. Now let's go onto the follow up with my exact use-case.


I have two streams A and B. A basically receives "rules" that the processing of B should observe to process.


There is a "key" that allows me to know that a rule x coming in A is for events with the same key coming in B.


I was planning to do (pseudo code):


A.connect(B).keyBy("thekey").flatMap(
   flatMap1()
      -> store in a ValueState the rule 
   flatMap2()
      -> use the state to get the rule, transform the element according to the rule, collect it
)




I think it should work, right, because the ValueState will be "per key" and contain the rule for this key and so on?


Now, what I really care is not having all the elements of key1 in the same parallelism, I just want to make sure key1 and key2 are isolated so I can use the key state to store the corresponding rule and key2 rules are not used for key1 and conversely.


So ideally instead of using 8 parallelisms, in order to use the full power of my system, even with 8 keys I would like to use 16 parallelisms as I don't care about all elements of key1 being in the same parallelism. All I care is that the state contain the rule corresponding to this key.


What would be the recommended approach here?


Thanks again for your help,
--
Christophe




On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler <ch...@apache.org> wrote:

You will get 16 parallel executions since you specify a parallellism of 16, however 8 of these will not get any data.


On 11.04.2018 23:29, Hao Sun wrote:

From what I learnt, you have to control parallelism your self. You can set parallelism on operator or set default one through flink-config.yaml.
I might be wrong.


On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif <cj...@gmail.com> wrote:

Hi all,


Imagine I have a default parallelism of 16 and I do something like


stream.keyBy("something").flatMap()


Now let's imagine I have less than 16 keys, maybe 8.


How many parallel executions of the flatMap function will I get? 8 because I have 8 keys, or 16 because I have default parallelism at 16?


(and I will have follow up questions depending on the answer I suspect ;))


Thanks,
--

Christophe



Re: keyBy and parallelism

Posted by Christophe Jolif <cj...@gmail.com>.
Thanks Chesnay (and others).

That's what I was figuring out. Now let's go onto the follow up with my
exact use-case.

I have two streams A and B. A basically receives "rules" that the
processing of B should observe to process.

There is a "key" that allows me to know that a rule x coming in A is for
events with the same key coming in B.

I was planning to do (pseudo code):

A.connect(B).keyBy("thekey").flatMap(
   flatMap1()
      -> store in a ValueState the rule
   flatMap2()
      -> use the state to get the rule, transform the element according to
the rule, collect it
)


I think it should work, right, because the ValueState will be "per key" and
contain the rule for this key and so on?

Now, what I really care is not having all the elements of key1 in the same
parallelism, I just want to make sure key1 and key2 are isolated so I can
use the key state to store the corresponding rule and key2 rules are not
used for key1 and conversely.

So ideally instead of using 8 parallelisms, in order to use the full power
of my system, even with 8 keys I would like to use 16 parallelisms as I
don't care about all elements of key1 being in the same parallelism. All I
care is that the state contain the rule corresponding to this key.

What would be the recommended approach here?

Thanks again for your help,
--
Christophe


On Thu, Apr 12, 2018 at 9:31 AM, Chesnay Schepler <ch...@apache.org>
wrote:

> You will get 16 parallel executions since you specify a parallellism of
> 16, however 8 of these will not get any data.
>
>
> On 11.04.2018 23:29, Hao Sun wrote:
>
> From what I learnt, you have to control parallelism your self. You can set
> parallelism on operator or set default one through flink-config.yaml.
> I might be wrong.
>
> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif <cj...@gmail.com> wrote:
>
>> Hi all,
>>
>> Imagine I have a default parallelism of 16 and I do something like
>>
>> stream.keyBy("something").flatMap()
>>
>> Now let's imagine I have less than 16 keys, maybe 8.
>>
>> How many parallel executions of the flatMap function will I get? 8
>> because I have 8 keys, or 16 because I have default parallelism at 16?
>>
>> (and I will have follow up questions depending on the answer I suspect ;))
>>
>> Thanks,
>> --
>> Christophe
>>
>
>

Re: keyBy and parallelism

Posted by Chesnay Schepler <ch...@apache.org>.
You will get 16 parallel executions since you specify a parallellism of 
16, however 8 of these will not get any data.

On 11.04.2018 23:29, Hao Sun wrote:
> From what I learnt, you have to control parallelism your self. You can 
> set parallelism on operator or set default one through flink-config.yaml.
> I might be wrong.
>
> On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif <cjolif@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi all,
>
>     Imagine I have a default parallelism of 16 and I do something like
>
>     stream.keyBy("something").flatMap()
>
>     Now let's imagine I have less than 16 keys, maybe 8.
>
>     How many parallel executions of the flatMap function will I get? 8
>     because I have 8 keys, or 16 because I have default parallelism at 16?
>
>     (and I will have follow up questions depending on the answer I
>     suspect ;))
>
>     Thanks,
>     -- 
>     Christophe
>


Re: keyBy and parallelism

Posted by Hao Sun <ha...@zendesk.com>.
From what I learnt, you have to control parallelism your self. You can set
parallelism on operator or set default one through flink-config.yaml.
I might be wrong.

On Wed, Apr 11, 2018 at 2:16 PM Christophe Jolif <cj...@gmail.com> wrote:

> Hi all,
>
> Imagine I have a default parallelism of 16 and I do something like
>
> stream.keyBy("something").flatMap()
>
> Now let's imagine I have less than 16 keys, maybe 8.
>
> How many parallel executions of the flatMap function will I get? 8 because
> I have 8 keys, or 16 because I have default parallelism at 16?
>
> (and I will have follow up questions depending on the answer I suspect ;))
>
> Thanks,
> --
> Christophe
>