You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gwenhael Pasquiers <gw...@ericsson.com> on 2016/02/03 15:48:39 UTC

Distribution of sinks among the nodes

Hi,

We try to deploy an application with the following “architecture” :

4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we disabled operator chaining).

So we’d like on each node :
1x source => 4x map => 1x sink

That way there are no exchanges between different instances of flink and performances would be optimal.

But we get (according to the flink GUI and the Host column when looking at the details of each task) :

Node 1 : 1 source =>  2 map
Node 2 : 1 source =>  1 map
Node 3 : 1 source =>  1 map
Node 4 : 1 source =>  12 maps => 4 sinks

(I think no comments are needed ☺)

The the Web UI says that there are 24 slots and they are all used but they don’t seem evenly dispatched …

How could we make Flink deploy the tasks the way we want ?

B.R.

Gwen’

RE: Distribution of sinks among the nodes

Posted by Gwenhael Pasquiers <gw...@ericsson.com>.
Sorry I was confused about the number of slots, it’s good now.

However, is disableChaing or disableOperatorChaining working properly ?
I called those methods everywhere I could but it still seems that some of my operators are being chained together I can’t go over 16 used slot where I should be at 24 if there was no chaining …



From: Gwenhael Pasquiers [mailto:gwenhael.pasquiers@ericsson.com]
Sent: jeudi 4 février 2016 09:55
To: user@flink.apache.org
Subject: RE: Distribution of sinks among the nodes

Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 sinks) ?

Or is there a way not to set the number of slots per TaskManager instead of globally so that they are at least equally dispatched among the nodes ?

As for the sink deployment : that’s not good news ; I mean we will have a non-negligible overhead : all the data generated by 3 of the 4 nodes will be sent to a third node instead of being sent to the “local” sink. Network I/O have a price.

Do you have some sort of “topology” feature coming in the roadmap ? Maybe a listener on the JobManager / env that would be trigerred, asking usk on which node we would prefer each node to be deployed. That way you keep the standard behavior, don’t have to make a complicated generic-optimized algorithm, and let the user make it’s choices. Should I create a JIRA ?

For the time being we could start the application 4 time : one time per node, put that’s not pretty at all ☺

B.R.

From: Till Rohrmann [mailto:trohrmann@apache.org]
Sent: mercredi 3 février 2016 17:58
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Distribution of sinks among the nodes


Hi Gwenhäel,

if you set the number of slots for each TaskManager to 4, then all of your mapper will be evenly spread out. The sources should also be evenly spread out. However, for the sinks since they depend on all mappers, it will be most likely random where they are deployed. So you might end up with 4 sink tasks on one machine.

Cheers,
Till
​

On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <gw...@ericsson.com>> wrote:
It is one type of mapper with a parallelism of 16
It's the same for the sinks and sources (parallelism of 4)

The settings are
Env.setParallelism(4)
Mapper.setPrallelism(env.getParallelism() * 4)

We mean to have X mapper tasks per source / sink

The mapper is doing some heavy computation and we have only 4 kafka partitions. That's why we need more mappers than sources / sinks


-----Original Message-----
From: Aljoscha Krettek [mailto:aljoscha@apache.org<ma...@apache.org>]
Sent: mercredi 3 février 2016 16:26
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Distribution of sinks among the nodes

Hi Gwenhäel,
when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators?

Regards,
Aljoscha
> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <gw...@ericsson.com>> wrote:
>
> Hi,
>
> We try to deploy an application with the following “architecture” :
>
> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we disabled operator chaining).
>
> So we’d like on each node :
> 1x source => 4x map => 1x sink
>
> That way there are no exchanges between different instances of flink and performances would be optimal.
>
> But we get (according to the flink GUI and the Host column when looking at the details of each task) :
>
> Node 1 : 1 source =>  2 map
> Node 2 : 1 source =>  1 map
> Node 3 : 1 source =>  1 map
> Node 4 : 1 source =>  12 maps => 4 sinks
>
> (I think no comments are needed J)
>
> The the Web UI says that there are 24 slots and they are all used but they don’t seem evenly dispatched …
>
> How could we make Flink deploy the tasks the way we want ?
>
> B.R.
>
> Gwen’


RE: Distribution of sinks among the nodes

Posted by Gwenhael Pasquiers <gw...@ericsson.com>.
Thanks,
One more thing to expect from the next version !

-----Original Message-----
From: Aljoscha Krettek [mailto:aljoscha@apache.org] 
Sent: lundi 8 février 2016 13:18
To: user@flink.apache.org
Subject: Re: Distribution of sinks among the nodes

Hi,
I just merged the new feature, so once this makes it into the 1.0-SNAPSHOT builds you should be able to use:

env.setParallelism(4);

env
    .addSource(kafkaSource)
    .rescale()
    .map(mapper).setParallelism(16);
    .rescale()
    .addSink(kafkaSink);

to get your desired behavior. For this to work, the parallelism should be set to 16, with 4 nodes. Then each node will have one source, 4 mappers and 1 sink. The source will only be connected to the 4 mappers while the 4 mappers will be the only ones connected to the sink.

Cheers,
Aljoscha

> On 04 Feb 2016, at 18:29, Aljoscha Krettek <al...@apache.org> wrote:
> 
> I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336
> 
> This will implement the data shipping pattern that you mentioned in your initial mail. I also have the Pull request almost ready.
> 
>> On 04 Feb 2016, at 16:25, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
>> 
>> Okay ;
>> 
>> Then I guess that the best we can do is to disable chaining (we really want one thread per operator since they are doing long operations) and have the same parallelism for sinks as mapping : that way each map will have it’s own sink and there will be no exchanges between flink instances.
>> 
>> From: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] On Behalf 
>> Of Stephan Ewen
>> Sent: jeudi 4 février 2016 15:13
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> To your other question, there are two things in Flink:
>> 
>> (1) Chaining. Tasks are folded together into one task, run by one thread.
>> 
>> (2) Resource groups: Tasks stay separate, have separate threads, but share a slot (which means share memory resources). See the link in my previous mail for an explanation concerning those.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org> wrote:
>> Hi Gwen!
>> 
>> You actually need not 24 slots, but only as many as the highest parallelism is (16). Slots do not hold individual tasks, but "pipelines". 
>> 
>> Here is an illustration how that works.
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/co
>> nfig.html#configuring-taskmanager-processing-slots
>> 
>> You can control whether a task can share the slot with the previous task with the function "startNewResourceGroup()" in the streaming API. Sharing lots makes a few things easier to reason about, especially when adding operators to a program, you need not immediately add new machines.
>> 
>> 
>> How to solve your program case
>> --------------------------------------------
>> 
>> We can actually make a pretty simple addition to Flink that will cause the tasks to be locally connected, which in turn will cause the scheduler to distribute them like you intend.
>> Rather than let the 4 sources rebalance across all 16 mappers, each one should redistribute to 4 local mappers, and these 4 mappers should send data to one local sink each.
>> 
>> We'll try and add that today and ping you once it is in.
>> 
>> The following would be sample code to use this:
>> 
>> env.setParallelism(4);
>> 
>> env
>>    .addSource(kafkaSource)
>>    .partitionFan()
>>    .map(mapper).setParallelism(16);
>>    .partitionFan()
>>    .addSink(kafkaSink);
>> 
>> 
>> 
>> A bit of background why the mechanism is the way that it is right now
>> ---------------------------------------------------------------------
>> -------------------------
>> 
>> You can think of a slot as a slice of resources. In particular, an amount of memory from the memory manager, but also memory in the network stack.
>> 
>> What we want to do quite soon is to make streaming programs more elastic. Consider for example the case that you have 16 slots on 4 machines, a machine fails, and you have no spare resources. In that case Flink should recognize that no spare resource can be acquired, and scale the job in. Since you have only 12 slots left, the parallelism of the mappers is reduced to 12, and the source task that was on the failed machine is moved to a slot on another machine.
>> 
>> It is important that the guaranteed resources for each task do not change when scaling in, to keep behavior predictable. In this case, each slot will still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots before. That is also the reason why the slots are per TaskManager, and not global, to associate them with a constant set of resources (mainly memory).
>> 
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
>> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 sinks) ?
>> 
>> Or is there a way not to set the number of slots per TaskManager instead of globally so that they are at least equally dispatched among the nodes ?
>> 
>> As for the sink deployment : that’s not good news ; I mean we will have a non-negligible overhead : all the data generated by 3 of the 4 nodes will be sent to a third node instead of being sent to the “local” sink. Network I/O have a price.
>> 
>> Do you have some sort of “topology” feature coming in the roadmap ? Maybe a listener on the JobManager / env that would be trigerred, asking usk on which node we would prefer each node to be deployed. That way you keep the standard behavior, don’t have to make a complicated generic-optimized algorithm, and let the user make it’s choices. Should I create a JIRA ?
>> 
>> For the time being we could start the application 4 time : one time 
>> per node, put that’s not pretty at all J
>> 
>> B.R.
>> 
>> From: Till Rohrmann [mailto:trohrmann@apache.org]
>> Sent: mercredi 3 février 2016 17:58
>> 
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> Hi Gwenhäel,
>> 
>> if you set the number of slots for each TaskManager to 4, then all of your mapper will be evenly spread out. The sources should also be evenly spread out. However, for the sinks since they depend on all mappers, it will be most likely random where they are deployed. So you might end up with 4 sink tasks on one machine.
>> 
>> Cheers,
>> Till
>> 
>> ​
>> 
>> On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
>> It is one type of mapper with a parallelism of 16 It's the same for 
>> the sinks and sources (parallelism of 4)
>> 
>> The settings are
>> Env.setParallelism(4)
>> Mapper.setPrallelism(env.getParallelism() * 4)
>> 
>> We mean to have X mapper tasks per source / sink
>> 
>> The mapper is doing some heavy computation and we have only 4 kafka 
>> partitions. That's why we need more mappers than sources / sinks
>> 
>> 
>> -----Original Message-----
>> From: Aljoscha Krettek [mailto:aljoscha@apache.org]
>> Sent: mercredi 3 février 2016 16:26
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> Hi Gwenhäel,
>> when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators?
>> 
>> Regards,
>> Aljoscha
>>> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
>>> 
>>> Hi,
>>> 
>>> We try to deploy an application with the following “architecture” :
>>> 
>>> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we disabled operator chaining).
>>> 
>>> So we’d like on each node :
>>> 1x source => 4x map => 1x sink
>>> 
>>> That way there are no exchanges between different instances of flink and performances would be optimal.
>>> 
>>> But we get (according to the flink GUI and the Host column when looking at the details of each task) :
>>> 
>>> Node 1 : 1 source =>  2 map
>>> Node 2 : 1 source =>  1 map
>>> Node 3 : 1 source =>  1 map
>>> Node 4 : 1 source =>  12 maps => 4 sinks
>>> 
>>> (I think no comments are needed J)
>>> 
>>> The the Web UI says that there are 24 slots and they are all used 
>>> but they don’t seem evenly dispatched …
>>> 
>>> How could we make Flink deploy the tasks the way we want ?
>>> 
>>> B.R.
>>> 
>>> Gwen’
>> 
> 


Re: Distribution of sinks among the nodes

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I just merged the new feature, so once this makes it into the 1.0-SNAPSHOT builds you should be able to use:

env.setParallelism(4);

env
    .addSource(kafkaSource)
    .rescale()
    .map(mapper).setParallelism(16);
    .rescale()
    .addSink(kafkaSink);

to get your desired behavior. For this to work, the parallelism should be set to 16, with 4 nodes. Then each node will have one source, 4 mappers and 1 sink. The source will only be connected to the 4 mappers while the 4 mappers will be the only ones connected to the sink.

Cheers,
Aljoscha

> On 04 Feb 2016, at 18:29, Aljoscha Krettek <al...@apache.org> wrote:
> 
> I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336
> 
> This will implement the data shipping pattern that you mentioned in your initial mail. I also have the Pull request almost ready.
> 
>> On 04 Feb 2016, at 16:25, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
>> 
>> Okay ;
>> 
>> Then I guess that the best we can do is to disable chaining (we really want one thread per operator since they are doing long operations) and have the same parallelism for sinks as mapping : that way each map will have it’s own sink and there will be no exchanges between flink instances.
>> 
>> From: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] On Behalf Of Stephan Ewen
>> Sent: jeudi 4 février 2016 15:13
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> To your other question, there are two things in Flink:
>> 
>> (1) Chaining. Tasks are folded together into one task, run by one thread.
>> 
>> (2) Resource groups: Tasks stay separate, have separate threads, but share a slot (which means share memory resources). See the link in my previous mail for an explanation concerning those.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org> wrote:
>> Hi Gwen!
>> 
>> You actually need not 24 slots, but only as many as the highest parallelism is (16). Slots do not hold individual tasks, but "pipelines". 
>> 
>> Here is an illustration how that works.
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots
>> 
>> You can control whether a task can share the slot with the previous task with the function "startNewResourceGroup()" in the streaming API. Sharing lots makes a few things easier to reason about, especially when adding operators to a program, you need not immediately add new machines.
>> 
>> 
>> How to solve your program case
>> --------------------------------------------
>> 
>> We can actually make a pretty simple addition to Flink that will cause the tasks to be locally connected, which in turn will cause the scheduler to distribute them like you intend.
>> Rather than let the 4 sources rebalance across all 16 mappers, each one should redistribute to 4 local mappers, and these 4 mappers should send data to one local sink each.
>> 
>> We'll try and add that today and ping you once it is in.
>> 
>> The following would be sample code to use this:
>> 
>> env.setParallelism(4);
>> 
>> env
>>    .addSource(kafkaSource)
>>    .partitionFan()
>>    .map(mapper).setParallelism(16);
>>    .partitionFan()
>>    .addSink(kafkaSink);
>> 
>> 
>> 
>> A bit of background why the mechanism is the way that it is right now
>> ----------------------------------------------------------------------------------------------
>> 
>> You can think of a slot as a slice of resources. In particular, an amount of memory from the memory manager, but also memory in the network stack.
>> 
>> What we want to do quite soon is to make streaming programs more elastic. Consider for example the case that you have 16 slots on 4 machines, a machine fails, and you have no spare resources. In that case Flink should recognize that no spare resource can be acquired, and scale the job in. Since you have only 12 slots left, the parallelism of the mappers is reduced to 12, and the source task that was on the failed machine is moved to a slot on another machine.
>> 
>> It is important that the guaranteed resources for each task do not change when scaling in, to keep behavior predictable. In this case, each slot will still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots before. That is also the reason why the slots are per TaskManager, and not global, to associate them with a constant set of resources (mainly memory).
>> 
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
>> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 sinks) ?
>> 
>> Or is there a way not to set the number of slots per TaskManager instead of globally so that they are at least equally dispatched among the nodes ?
>> 
>> As for the sink deployment : that’s not good news ; I mean we will have a non-negligible overhead : all the data generated by 3 of the 4 nodes will be sent to a third node instead of being sent to the “local” sink. Network I/O have a price.
>> 
>> Do you have some sort of “topology” feature coming in the roadmap ? Maybe a listener on the JobManager / env that would be trigerred, asking usk on which node we would prefer each node to be deployed. That way you keep the standard behavior, don’t have to make a complicated generic-optimized algorithm, and let the user make it’s choices. Should I create a JIRA ?
>> 
>> For the time being we could start the application 4 time : one time per node, put that’s not pretty at all J
>> 
>> B.R.
>> 
>> From: Till Rohrmann [mailto:trohrmann@apache.org] 
>> Sent: mercredi 3 février 2016 17:58
>> 
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> Hi Gwenhäel,
>> 
>> if you set the number of slots for each TaskManager to 4, then all of your mapper will be evenly spread out. The sources should also be evenly spread out. However, for the sinks since they depend on all mappers, it will be most likely random where they are deployed. So you might end up with 4 sink tasks on one machine.
>> 
>> Cheers,
>> Till
>> 
>> ​
>> 
>> On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
>> It is one type of mapper with a parallelism of 16
>> It's the same for the sinks and sources (parallelism of 4)
>> 
>> The settings are
>> Env.setParallelism(4)
>> Mapper.setPrallelism(env.getParallelism() * 4)
>> 
>> We mean to have X mapper tasks per source / sink
>> 
>> The mapper is doing some heavy computation and we have only 4 kafka partitions. That's why we need more mappers than sources / sinks
>> 
>> 
>> -----Original Message-----
>> From: Aljoscha Krettek [mailto:aljoscha@apache.org]
>> Sent: mercredi 3 février 2016 16:26
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> Hi Gwenhäel,
>> when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators?
>> 
>> Regards,
>> Aljoscha
>>> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
>>> 
>>> Hi,
>>> 
>>> We try to deploy an application with the following “architecture” :
>>> 
>>> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we disabled operator chaining).
>>> 
>>> So we’d like on each node :
>>> 1x source => 4x map => 1x sink
>>> 
>>> That way there are no exchanges between different instances of flink and performances would be optimal.
>>> 
>>> But we get (according to the flink GUI and the Host column when looking at the details of each task) :
>>> 
>>> Node 1 : 1 source =>  2 map
>>> Node 2 : 1 source =>  1 map
>>> Node 3 : 1 source =>  1 map
>>> Node 4 : 1 source =>  12 maps => 4 sinks
>>> 
>>> (I think no comments are needed J)
>>> 
>>> The the Web UI says that there are 24 slots and they are all used but they don’t seem evenly dispatched …
>>> 
>>> How could we make Flink deploy the tasks the way we want ?
>>> 
>>> B.R.
>>> 
>>> Gwen’
>> 
> 


Re: Distribution of sinks among the nodes

Posted by Aljoscha Krettek <al...@apache.org>.
I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336

This will implement the data shipping pattern that you mentioned in your initial mail. I also have the Pull request almost ready.

> On 04 Feb 2016, at 16:25, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
> 
> Okay ;
>  
> Then I guess that the best we can do is to disable chaining (we really want one thread per operator since they are doing long operations) and have the same parallelism for sinks as mapping : that way each map will have it’s own sink and there will be no exchanges between flink instances.
>  
> From: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] On Behalf Of Stephan Ewen
> Sent: jeudi 4 février 2016 15:13
> To: user@flink.apache.org
> Subject: Re: Distribution of sinks among the nodes
>  
> To your other question, there are two things in Flink:
>  
> (1) Chaining. Tasks are folded together into one task, run by one thread.
>  
> (2) Resource groups: Tasks stay separate, have separate threads, but share a slot (which means share memory resources). See the link in my previous mail for an explanation concerning those.
>  
> Greetings,
> Stephan
>  
>  
> On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi Gwen!
>  
> You actually need not 24 slots, but only as many as the highest parallelism is (16). Slots do not hold individual tasks, but "pipelines". 
>  
> Here is an illustration how that works.
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots
>  
> You can control whether a task can share the slot with the previous task with the function "startNewResourceGroup()" in the streaming API. Sharing lots makes a few things easier to reason about, especially when adding operators to a program, you need not immediately add new machines.
>  
>  
> How to solve your program case
> --------------------------------------------
>  
> We can actually make a pretty simple addition to Flink that will cause the tasks to be locally connected, which in turn will cause the scheduler to distribute them like you intend.
> Rather than let the 4 sources rebalance across all 16 mappers, each one should redistribute to 4 local mappers, and these 4 mappers should send data to one local sink each.
>  
> We'll try and add that today and ping you once it is in.
>  
> The following would be sample code to use this:
>  
> env.setParallelism(4);
>  
> env
>     .addSource(kafkaSource)
>     .partitionFan()
>     .map(mapper).setParallelism(16);
>     .partitionFan()
>     .addSink(kafkaSink);
>  
>  
>  
> A bit of background why the mechanism is the way that it is right now
> ----------------------------------------------------------------------------------------------
>  
> You can think of a slot as a slice of resources. In particular, an amount of memory from the memory manager, but also memory in the network stack.
>  
> What we want to do quite soon is to make streaming programs more elastic. Consider for example the case that you have 16 slots on 4 machines, a machine fails, and you have no spare resources. In that case Flink should recognize that no spare resource can be acquired, and scale the job in. Since you have only 12 slots left, the parallelism of the mappers is reduced to 12, and the source task that was on the failed machine is moved to a slot on another machine.
>  
> It is important that the guaranteed resources for each task do not change when scaling in, to keep behavior predictable. In this case, each slot will still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots before. That is also the reason why the slots are per TaskManager, and not global, to associate them with a constant set of resources (mainly memory).
>  
>  
> Greetings,
> Stephan
>  
>  
>  
> On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 sinks) ?
>  
> Or is there a way not to set the number of slots per TaskManager instead of globally so that they are at least equally dispatched among the nodes ?
>  
> As for the sink deployment : that’s not good news ; I mean we will have a non-negligible overhead : all the data generated by 3 of the 4 nodes will be sent to a third node instead of being sent to the “local” sink. Network I/O have a price.
>  
> Do you have some sort of “topology” feature coming in the roadmap ? Maybe a listener on the JobManager / env that would be trigerred, asking usk on which node we would prefer each node to be deployed. That way you keep the standard behavior, don’t have to make a complicated generic-optimized algorithm, and let the user make it’s choices. Should I create a JIRA ?
>  
> For the time being we could start the application 4 time : one time per node, put that’s not pretty at all J
>  
> B.R.
>  
> From: Till Rohrmann [mailto:trohrmann@apache.org] 
> Sent: mercredi 3 février 2016 17:58
> 
> To: user@flink.apache.org
> Subject: Re: Distribution of sinks among the nodes
>  
> Hi Gwenhäel,
> 
> if you set the number of slots for each TaskManager to 4, then all of your mapper will be evenly spread out. The sources should also be evenly spread out. However, for the sinks since they depend on all mappers, it will be most likely random where they are deployed. So you might end up with 4 sink tasks on one machine.
> 
> Cheers,
> Till
> 
> ​
>  
> On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
> It is one type of mapper with a parallelism of 16
> It's the same for the sinks and sources (parallelism of 4)
> 
> The settings are
> Env.setParallelism(4)
> Mapper.setPrallelism(env.getParallelism() * 4)
> 
> We mean to have X mapper tasks per source / sink
> 
> The mapper is doing some heavy computation and we have only 4 kafka partitions. That's why we need more mappers than sources / sinks
> 
> 
> -----Original Message-----
> From: Aljoscha Krettek [mailto:aljoscha@apache.org]
> Sent: mercredi 3 février 2016 16:26
> To: user@flink.apache.org
> Subject: Re: Distribution of sinks among the nodes
> 
> Hi Gwenhäel,
> when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators?
> 
> Regards,
> Aljoscha
> > On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
> >
> > Hi,
> >
> > We try to deploy an application with the following “architecture” :
> >
> > 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we disabled operator chaining).
> >
> > So we’d like on each node :
> > 1x source => 4x map => 1x sink
> >
> > That way there are no exchanges between different instances of flink and performances would be optimal.
> >
> > But we get (according to the flink GUI and the Host column when looking at the details of each task) :
> >
> > Node 1 : 1 source =>  2 map
> > Node 2 : 1 source =>  1 map
> > Node 3 : 1 source =>  1 map
> > Node 4 : 1 source =>  12 maps => 4 sinks
> >
> > (I think no comments are needed J)
> >
> > The the Web UI says that there are 24 slots and they are all used but they don’t seem evenly dispatched …
> >
> > How could we make Flink deploy the tasks the way we want ?
> >
> > B.R.
> >
> > Gwen’
> 


RE: Distribution of sinks among the nodes

Posted by Gwenhael Pasquiers <gw...@ericsson.com>.
Okay ;

Then I guess that the best we can do is to disable chaining (we really want one thread per operator since they are doing long operations) and have the same parallelism for sinks as mapping : that way each map will have it’s own sink and there will be no exchanges between flink instances.

From: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] On Behalf Of Stephan Ewen
Sent: jeudi 4 février 2016 15:13
To: user@flink.apache.org
Subject: Re: Distribution of sinks among the nodes

To your other question, there are two things in Flink:

(1) Chaining. Tasks are folded together into one task, run by one thread.

(2) Resource groups: Tasks stay separate, have separate threads, but share a slot (which means share memory resources). See the link in my previous mail for an explanation concerning those.

Greetings,
Stephan


On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org>> wrote:
Hi Gwen!

You actually need not 24 slots, but only as many as the highest parallelism is (16). Slots do not hold individual tasks, but "pipelines".

Here is an illustration how that works.
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots

You can control whether a task can share the slot with the previous task with the function "startNewResourceGroup()" in the streaming API. Sharing lots makes a few things easier to reason about, especially when adding operators to a program, you need not immediately add new machines.


How to solve your program case
--------------------------------------------

We can actually make a pretty simple addition to Flink that will cause the tasks to be locally connected, which in turn will cause the scheduler to distribute them like you intend.
Rather than let the 4 sources rebalance across all 16 mappers, each one should redistribute to 4 local mappers, and these 4 mappers should send data to one local sink each.

We'll try and add that today and ping you once it is in.

The following would be sample code to use this:

env.setParallelism(4);

env
    .addSource(kafkaSource)
    .partitionFan()
    .map(mapper).setParallelism(16);
    .partitionFan()
    .addSink(kafkaSink);



A bit of background why the mechanism is the way that it is right now
----------------------------------------------------------------------------------------------

You can think of a slot as a slice of resources. In particular, an amount of memory from the memory manager, but also memory in the network stack.

What we want to do quite soon is to make streaming programs more elastic. Consider for example the case that you have 16 slots on 4 machines, a machine fails, and you have no spare resources. In that case Flink should recognize that no spare resource can be acquired, and scale the job in. Since you have only 12 slots left, the parallelism of the mappers is reduced to 12, and the source task that was on the failed machine is moved to a slot on another machine.

It is important that the guaranteed resources for each task do not change when scaling in, to keep behavior predictable. In this case, each slot will still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots before. That is also the reason why the slots are per TaskManager, and not global, to associate them with a constant set of resources (mainly memory).


Greetings,
Stephan



On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers <gw...@ericsson.com>> wrote:
Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 sinks) ?

Or is there a way not to set the number of slots per TaskManager instead of globally so that they are at least equally dispatched among the nodes ?

As for the sink deployment : that’s not good news ; I mean we will have a non-negligible overhead : all the data generated by 3 of the 4 nodes will be sent to a third node instead of being sent to the “local” sink. Network I/O have a price.

Do you have some sort of “topology” feature coming in the roadmap ? Maybe a listener on the JobManager / env that would be trigerred, asking usk on which node we would prefer each node to be deployed. That way you keep the standard behavior, don’t have to make a complicated generic-optimized algorithm, and let the user make it’s choices. Should I create a JIRA ?

For the time being we could start the application 4 time : one time per node, put that’s not pretty at all ☺

B.R.

From: Till Rohrmann [mailto:trohrmann@apache.org<ma...@apache.org>]
Sent: mercredi 3 février 2016 17:58

To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Distribution of sinks among the nodes


Hi Gwenhäel,

if you set the number of slots for each TaskManager to 4, then all of your mapper will be evenly spread out. The sources should also be evenly spread out. However, for the sinks since they depend on all mappers, it will be most likely random where they are deployed. So you might end up with 4 sink tasks on one machine.

Cheers,
Till
​

On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <gw...@ericsson.com>> wrote:
It is one type of mapper with a parallelism of 16
It's the same for the sinks and sources (parallelism of 4)

The settings are
Env.setParallelism(4)
Mapper.setPrallelism(env.getParallelism() * 4)

We mean to have X mapper tasks per source / sink

The mapper is doing some heavy computation and we have only 4 kafka partitions. That's why we need more mappers than sources / sinks


-----Original Message-----
From: Aljoscha Krettek [mailto:aljoscha@apache.org<ma...@apache.org>]
Sent: mercredi 3 février 2016 16:26
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Distribution of sinks among the nodes

Hi Gwenhäel,
when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators?

Regards,
Aljoscha
> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <gw...@ericsson.com>> wrote:
>
> Hi,
>
> We try to deploy an application with the following “architecture” :
>
> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we disabled operator chaining).
>
> So we’d like on each node :
> 1x source => 4x map => 1x sink
>
> That way there are no exchanges between different instances of flink and performances would be optimal.
>
> But we get (according to the flink GUI and the Host column when looking at the details of each task) :
>
> Node 1 : 1 source =>  2 map
> Node 2 : 1 source =>  1 map
> Node 3 : 1 source =>  1 map
> Node 4 : 1 source =>  12 maps => 4 sinks
>
> (I think no comments are needed J)
>
> The the Web UI says that there are 24 slots and they are all used but they don’t seem evenly dispatched …
>
> How could we make Flink deploy the tasks the way we want ?
>
> B.R.
>
> Gwen’




Re: Distribution of sinks among the nodes

Posted by Stephan Ewen <se...@apache.org>.
To your other question, there are two things in Flink:

(1) Chaining. Tasks are folded together into one task, run by one thread.

(2) Resource groups: Tasks stay separate, have separate threads, but share
a slot (which means share memory resources). See the link in my previous
mail for an explanation concerning those.

Greetings,
Stephan


On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Gwen!
>
> You actually need not 24 slots, but only as many as the highest
> parallelism is (16). Slots do not hold individual tasks, but "pipelines".
>
> Here is an illustration how that works.
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots
>
> You can control whether a task can share the slot with the previous task
> with the function "startNewResourceGroup()" in the streaming API. Sharing
> lots makes a few things easier to reason about, especially when adding
> operators to a program, you need not immediately add new machines.
>
>
> How to solve your program case
> --------------------------------------------
>
> We can actually make a pretty simple addition to Flink that will cause the
> tasks to be locally connected, which in turn will cause the scheduler to
> distribute them like you intend.
> Rather than let the 4 sources rebalance across all 16 mappers, each one
> should redistribute to 4 local mappers, and these 4 mappers should send
> data to one local sink each.
>
> We'll try and add that today and ping you once it is in.
>
> The following would be sample code to use this:
>
> env.setParallelism(4);
>
> env
>     .addSource(kafkaSource)
>     .partitionFan()
>     .map(mapper).setParallelism(16);
>     .partitionFan()
>     .addSink(kafkaSink);
>
>
>
> A bit of background why the mechanism is the way that it is right now
>
> ----------------------------------------------------------------------------------------------
>
> You can think of a slot as a slice of resources. In particular, an amount
> of memory from the memory manager, but also memory in the network stack.
>
> What we want to do quite soon is to make streaming programs more elastic.
> Consider for example the case that you have 16 slots on 4 machines, a
> machine fails, and you have no spare resources. In that case Flink should
> recognize that no spare resource can be acquired, and scale the job in.
> Since you have only 12 slots left, the parallelism of the mappers is
> reduced to 12, and the source task that was on the failed machine is moved
> to a slot on another machine.
>
> It is important that the guaranteed resources for each task do not change
> when scaling in, to keep behavior predictable. In this case, each slot will
> still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots
> before. That is also the reason why the slots are per TaskManager, and not
> global, to associate them with a constant set of resources (mainly memory).
>
>
> Greetings,
> Stephan
>
>
>
> On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers <
> gwenhael.pasquiers@ericsson.com> wrote:
>
>> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers +
>> 4 sinks) ?
>>
>>
>>
>> *Or is there a way not to set the number of slots per TaskManager instead
>> of globally so that they are at least equally dispatched among the nodes ?*
>>
>>
>>
>> As for the sink deployment : that’s not good news ; I mean we will have a
>> non-negligible overhead : all the data generated by 3 of the 4 nodes will
>> be sent to a third node instead of being sent to the “local” sink. Network
>> I/O have a price.
>>
>>
>>
>> Do you have some sort of “topology” feature coming in the roadmap ? Maybe
>> a listener on the JobManager / env that would be trigerred, asking usk on
>> which node we would prefer each node to be deployed. That way you keep the
>> standard behavior, don’t have to make a complicated generic-optimized
>> algorithm, and let the user make it’s choices. *Should I create a JIRA ?*
>>
>>
>>
>> For the time being we could start the application 4 time : one time per
>> node, put that’s not pretty at all J
>>
>>
>>
>> B.R.
>>
>>
>>
>> *From:* Till Rohrmann [mailto:trohrmann@apache.org]
>> *Sent:* mercredi 3 février 2016 17:58
>>
>> *To:* user@flink.apache.org
>> *Subject:* Re: Distribution of sinks among the nodes
>>
>>
>>
>> Hi Gwenhäel,
>>
>> if you set the number of slots for each TaskManager to 4, then all of
>> your mapper will be evenly spread out. The sources should also be evenly
>> spread out. However, for the sinks since they depend on all mappers, it
>> will be most likely random where they are deployed. So you might end up
>> with 4 sink tasks on one machine.
>>
>> Cheers,
>> Till
>>
>> ​
>>
>>
>>
>> On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <
>> gwenhael.pasquiers@ericsson.com> wrote:
>>
>> It is one type of mapper with a parallelism of 16
>> It's the same for the sinks and sources (parallelism of 4)
>>
>> The settings are
>> Env.setParallelism(4)
>> Mapper.setPrallelism(env.getParallelism() * 4)
>>
>> We mean to have X mapper tasks per source / sink
>>
>> The mapper is doing some heavy computation and we have only 4 kafka
>> partitions. That's why we need more mappers than sources / sinks
>>
>>
>>
>> -----Original Message-----
>> From: Aljoscha Krettek [mailto:aljoscha@apache.org]
>> Sent: mercredi 3 février 2016 16:26
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>>
>> Hi Gwenhäel,
>> when you say 16 maps, are we talking about one mapper with parallelism 16
>> or 16 unique map operators?
>>
>> Regards,
>> Aljoscha
>> > On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <
>> gwenhael.pasquiers@ericsson.com> wrote:
>> >
>> > Hi,
>> >
>> > We try to deploy an application with the following “architecture” :
>> >
>> > 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots
>> (we disabled operator chaining).
>> >
>> > So we’d like on each node :
>> > 1x source => 4x map => 1x sink
>> >
>> > That way there are no exchanges between different instances of flink
>> and performances would be optimal.
>> >
>> > But we get (according to the flink GUI and the Host column when looking
>> at the details of each task) :
>> >
>> > Node 1 : 1 source =>  2 map
>> > Node 2 : 1 source =>  1 map
>> > Node 3 : 1 source =>  1 map
>> > Node 4 : 1 source =>  12 maps => 4 sinks
>> >
>> > (I think no comments are needed J)
>> >
>> > The the Web UI says that there are 24 slots and they are all used but
>> they don’t seem evenly dispatched …
>> >
>> > How could we make Flink deploy the tasks the way we want ?
>> >
>> > B.R.
>> >
>> > Gwen’
>>
>>
>>
>
>

Re: Distribution of sinks among the nodes

Posted by Stephan Ewen <se...@apache.org>.
Hi Gwen!

You actually need not 24 slots, but only as many as the highest parallelism
is (16). Slots do not hold individual tasks, but "pipelines".

Here is an illustration how that works.
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots

You can control whether a task can share the slot with the previous task
with the function "startNewResourceGroup()" in the streaming API. Sharing
lots makes a few things easier to reason about, especially when adding
operators to a program, you need not immediately add new machines.


How to solve your program case
--------------------------------------------

We can actually make a pretty simple addition to Flink that will cause the
tasks to be locally connected, which in turn will cause the scheduler to
distribute them like you intend.
Rather than let the 4 sources rebalance across all 16 mappers, each one
should redistribute to 4 local mappers, and these 4 mappers should send
data to one local sink each.

We'll try and add that today and ping you once it is in.

The following would be sample code to use this:

env.setParallelism(4);

env
    .addSource(kafkaSource)
    .partitionFan()
    .map(mapper).setParallelism(16);
    .partitionFan()
    .addSink(kafkaSink);



A bit of background why the mechanism is the way that it is right now
----------------------------------------------------------------------------------------------

You can think of a slot as a slice of resources. In particular, an amount
of memory from the memory manager, but also memory in the network stack.

What we want to do quite soon is to make streaming programs more elastic.
Consider for example the case that you have 16 slots on 4 machines, a
machine fails, and you have no spare resources. In that case Flink should
recognize that no spare resource can be acquired, and scale the job in.
Since you have only 12 slots left, the parallelism of the mappers is
reduced to 12, and the source task that was on the failed machine is moved
to a slot on another machine.

It is important that the guaranteed resources for each task do not change
when scaling in, to keep behavior predictable. In this case, each slot will
still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots
before. That is also the reason why the slots are per TaskManager, and not
global, to associate them with a constant set of resources (mainly memory).


Greetings,
Stephan



On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers <
gwenhael.pasquiers@ericsson.com> wrote:

> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4
> sinks) ?
>
>
>
> *Or is there a way not to set the number of slots per TaskManager instead
> of globally so that they are at least equally dispatched among the nodes ?*
>
>
>
> As for the sink deployment : that’s not good news ; I mean we will have a
> non-negligible overhead : all the data generated by 3 of the 4 nodes will
> be sent to a third node instead of being sent to the “local” sink. Network
> I/O have a price.
>
>
>
> Do you have some sort of “topology” feature coming in the roadmap ? Maybe
> a listener on the JobManager / env that would be trigerred, asking usk on
> which node we would prefer each node to be deployed. That way you keep the
> standard behavior, don’t have to make a complicated generic-optimized
> algorithm, and let the user make it’s choices. *Should I create a JIRA ?*
>
>
>
> For the time being we could start the application 4 time : one time per
> node, put that’s not pretty at all J
>
>
>
> B.R.
>
>
>
> *From:* Till Rohrmann [mailto:trohrmann@apache.org]
> *Sent:* mercredi 3 février 2016 17:58
>
> *To:* user@flink.apache.org
> *Subject:* Re: Distribution of sinks among the nodes
>
>
>
> Hi Gwenhäel,
>
> if you set the number of slots for each TaskManager to 4, then all of
> your mapper will be evenly spread out. The sources should also be evenly
> spread out. However, for the sinks since they depend on all mappers, it
> will be most likely random where they are deployed. So you might end up
> with 4 sink tasks on one machine.
>
> Cheers,
> Till
>
> ​
>
>
>
> On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <
> gwenhael.pasquiers@ericsson.com> wrote:
>
> It is one type of mapper with a parallelism of 16
> It's the same for the sinks and sources (parallelism of 4)
>
> The settings are
> Env.setParallelism(4)
> Mapper.setPrallelism(env.getParallelism() * 4)
>
> We mean to have X mapper tasks per source / sink
>
> The mapper is doing some heavy computation and we have only 4 kafka
> partitions. That's why we need more mappers than sources / sinks
>
>
>
> -----Original Message-----
> From: Aljoscha Krettek [mailto:aljoscha@apache.org]
> Sent: mercredi 3 février 2016 16:26
> To: user@flink.apache.org
> Subject: Re: Distribution of sinks among the nodes
>
> Hi Gwenhäel,
> when you say 16 maps, are we talking about one mapper with parallelism 16
> or 16 unique map operators?
>
> Regards,
> Aljoscha
> > On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <
> gwenhael.pasquiers@ericsson.com> wrote:
> >
> > Hi,
> >
> > We try to deploy an application with the following “architecture” :
> >
> > 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots
> (we disabled operator chaining).
> >
> > So we’d like on each node :
> > 1x source => 4x map => 1x sink
> >
> > That way there are no exchanges between different instances of flink and
> performances would be optimal.
> >
> > But we get (according to the flink GUI and the Host column when looking
> at the details of each task) :
> >
> > Node 1 : 1 source =>  2 map
> > Node 2 : 1 source =>  1 map
> > Node 3 : 1 source =>  1 map
> > Node 4 : 1 source =>  12 maps => 4 sinks
> >
> > (I think no comments are needed J)
> >
> > The the Web UI says that there are 24 slots and they are all used but
> they don’t seem evenly dispatched …
> >
> > How could we make Flink deploy the tasks the way we want ?
> >
> > B.R.
> >
> > Gwen’
>
>
>

RE: Distribution of sinks among the nodes

Posted by Gwenhael Pasquiers <gw...@ericsson.com>.
Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 sinks) ?

Or is there a way not to set the number of slots per TaskManager instead of globally so that they are at least equally dispatched among the nodes ?

As for the sink deployment : that’s not good news ; I mean we will have a non-negligible overhead : all the data generated by 3 of the 4 nodes will be sent to a third node instead of being sent to the “local” sink. Network I/O have a price.

Do you have some sort of “topology” feature coming in the roadmap ? Maybe a listener on the JobManager / env that would be trigerred, asking usk on which node we would prefer each node to be deployed. That way you keep the standard behavior, don’t have to make a complicated generic-optimized algorithm, and let the user make it’s choices. Should I create a JIRA ?

For the time being we could start the application 4 time : one time per node, put that’s not pretty at all ☺

B.R.

From: Till Rohrmann [mailto:trohrmann@apache.org]
Sent: mercredi 3 février 2016 17:58
To: user@flink.apache.org
Subject: Re: Distribution of sinks among the nodes


Hi Gwenhäel,

if you set the number of slots for each TaskManager to 4, then all of your mapper will be evenly spread out. The sources should also be evenly spread out. However, for the sinks since they depend on all mappers, it will be most likely random where they are deployed. So you might end up with 4 sink tasks on one machine.

Cheers,
Till
​

On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <gw...@ericsson.com>> wrote:
It is one type of mapper with a parallelism of 16
It's the same for the sinks and sources (parallelism of 4)

The settings are
Env.setParallelism(4)
Mapper.setPrallelism(env.getParallelism() * 4)

We mean to have X mapper tasks per source / sink

The mapper is doing some heavy computation and we have only 4 kafka partitions. That's why we need more mappers than sources / sinks


-----Original Message-----
From: Aljoscha Krettek [mailto:aljoscha@apache.org<ma...@apache.org>]
Sent: mercredi 3 février 2016 16:26
To: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Distribution of sinks among the nodes

Hi Gwenhäel,
when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators?

Regards,
Aljoscha
> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <gw...@ericsson.com>> wrote:
>
> Hi,
>
> We try to deploy an application with the following “architecture” :
>
> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we disabled operator chaining).
>
> So we’d like on each node :
> 1x source => 4x map => 1x sink
>
> That way there are no exchanges between different instances of flink and performances would be optimal.
>
> But we get (according to the flink GUI and the Host column when looking at the details of each task) :
>
> Node 1 : 1 source =>  2 map
> Node 2 : 1 source =>  1 map
> Node 3 : 1 source =>  1 map
> Node 4 : 1 source =>  12 maps => 4 sinks
>
> (I think no comments are needed J)
>
> The the Web UI says that there are 24 slots and they are all used but they don’t seem evenly dispatched …
>
> How could we make Flink deploy the tasks the way we want ?
>
> B.R.
>
> Gwen’


Re: Distribution of sinks among the nodes

Posted by Till Rohrmann <tr...@apache.org>.
Hi Gwenhäel,

if you set the number of slots for each TaskManager to 4, then all of your
mapper will be evenly spread out. The sources should also be evenly spread
out. However, for the sinks since they depend on all mappers, it will be
most likely random where they are deployed. So you might end up with 4 sink
tasks on one machine.

Cheers,
Till
​

On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers <
gwenhael.pasquiers@ericsson.com> wrote:

> It is one type of mapper with a parallelism of 16
> It's the same for the sinks and sources (parallelism of 4)
>
> The settings are
> Env.setParallelism(4)
> Mapper.setPrallelism(env.getParallelism() * 4)
>
> We mean to have X mapper tasks per source / sink
>
> The mapper is doing some heavy computation and we have only 4 kafka
> partitions. That's why we need more mappers than sources / sinks
>
>
> -----Original Message-----
> From: Aljoscha Krettek [mailto:aljoscha@apache.org]
> Sent: mercredi 3 février 2016 16:26
> To: user@flink.apache.org
> Subject: Re: Distribution of sinks among the nodes
>
> Hi Gwenhäel,
> when you say 16 maps, are we talking about one mapper with parallelism 16
> or 16 unique map operators?
>
> Regards,
> Aljoscha
> > On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <
> gwenhael.pasquiers@ericsson.com> wrote:
> >
> > Hi,
> >
> > We try to deploy an application with the following “architecture” :
> >
> > 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots
> (we disabled operator chaining).
> >
> > So we’d like on each node :
> > 1x source => 4x map => 1x sink
> >
> > That way there are no exchanges between different instances of flink and
> performances would be optimal.
> >
> > But we get (according to the flink GUI and the Host column when looking
> at the details of each task) :
> >
> > Node 1 : 1 source =>  2 map
> > Node 2 : 1 source =>  1 map
> > Node 3 : 1 source =>  1 map
> > Node 4 : 1 source =>  12 maps => 4 sinks
> >
> > (I think no comments are needed J)
> >
> > The the Web UI says that there are 24 slots and they are all used but
> they don’t seem evenly dispatched …
> >
> > How could we make Flink deploy the tasks the way we want ?
> >
> > B.R.
> >
> > Gwen’
>
>

RE: Distribution of sinks among the nodes

Posted by Gwenhael Pasquiers <gw...@ericsson.com>.
It is one type of mapper with a parallelism of 16
It's the same for the sinks and sources (parallelism of 4)

The settings are 
Env.setParallelism(4)
Mapper.setPrallelism(env.getParallelism() * 4)

We mean to have X mapper tasks per source / sink

The mapper is doing some heavy computation and we have only 4 kafka partitions. That's why we need more mappers than sources / sinks


-----Original Message-----
From: Aljoscha Krettek [mailto:aljoscha@apache.org] 
Sent: mercredi 3 février 2016 16:26
To: user@flink.apache.org
Subject: Re: Distribution of sinks among the nodes

Hi Gwenhäel,
when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators?

Regards,
Aljoscha
> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
> 
> Hi,
>  
> We try to deploy an application with the following “architecture” :
>  
> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we disabled operator chaining).
>  
> So we’d like on each node :
> 1x source => 4x map => 1x sink
>  
> That way there are no exchanges between different instances of flink and performances would be optimal.
>  
> But we get (according to the flink GUI and the Host column when looking at the details of each task) :
>  
> Node 1 : 1 source =>  2 map
> Node 2 : 1 source =>  1 map
> Node 3 : 1 source =>  1 map
> Node 4 : 1 source =>  12 maps => 4 sinks
>  
> (I think no comments are needed J)
>  
> The the Web UI says that there are 24 slots and they are all used but they don’t seem evenly dispatched …
>  
> How could we make Flink deploy the tasks the way we want ?
>  
> B.R.
>  
> Gwen’


Re: Distribution of sinks among the nodes

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Gwenhäel,
when you say 16 maps, are we talking about one mapper with parallelism 16 or 16 unique map operators?

Regards,
Aljoscha
> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers <gw...@ericsson.com> wrote:
> 
> Hi,
>  
> We try to deploy an application with the following “architecture” :
>  
> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we disabled operator chaining).
>  
> So we’d like on each node :
> 1x source => 4x map => 1x sink
>  
> That way there are no exchanges between different instances of flink and performances would be optimal.
>  
> But we get (according to the flink GUI and the Host column when looking at the details of each task) :
>  
> Node 1 : 1 source =>  2 map
> Node 2 : 1 source =>  1 map
> Node 3 : 1 source =>  1 map
> Node 4 : 1 source =>  12 maps => 4 sinks
>  
> (I think no comments are needed J)
>  
> The the Web UI says that there are 24 slots and they are all used but they don’t seem evenly dispatched …
>  
> How could we make Flink deploy the tasks the way we want ?
>  
> B.R.
>  
> Gwen’