You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aeden Jameson <ae...@gmail.com> on 2021/03/10 18:14:17 UTC

Evenly Spreading Out Source Tasks

    I have a cluster with 18 task managers 4 task slots each running a
job whose source/sink(s) are declared with FlinkSQL using the Kafka
connector. The topic being read has 36 partitions. The problem I'm
observing is that the subtasks for the sources are not evenly
distributed. For example, 1 task manager will have 4 active source
subtasks and other TM's none. Is there a way to force  each task
manager to have 2 active source subtasks.  I tried using the setting
cluster.evenly-spread-out-slots: true , but that didn't have the
desired effect.

-- 
Thank you,
Aeden

Re: Evenly Spreading Out Source Tasks

Posted by Aeden Jameson <ae...@gmail.com>.
There may be a slight misunderstanding: all the FlinkSql tasks _were_
set at a parallelism of 72 -- 18 nodes 4 slots. I was hoping that the
setting cluster.evenly-spread-out-slots would spread out the active
kafka consumers evenly among the TM's given the topic has 36
partitions, but I now realize that doesn't necessarily make sense. I
have since reduced the cluster to 9 pods with 4 slots each and things
run well. I'm learning the hard way. :) Thanks for your time.

 Unfortunately I can't share the job manager logs.

On Mon, Mar 15, 2021 at 8:37 PM Xintong Song <to...@gmail.com> wrote:
>
> If all the tasks have the same parallelism 36, your job should only allocate 36 slots. The evenly-spread-out-slots option should help in your case.
>
> Is it possible for you to share the complete jobmanager logs?
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Mar 16, 2021 at 12:46 AM Aeden Jameson <ae...@gmail.com> wrote:
>>
>> Hi Xintong,
>>
>>     Thanks for replying.  Yes, you understood my scenario. Every task
>> has the same parallelism since we're using FlinkSql unless there is a
>> way to change the parallelism of the source task that I have missed.
>> Your explanation of the setting makes sense and is what I ended up
>> concluding. Assuming one can't change the parallelism of FlinkSQL
>> tasks other than the sink-parallelism option I've concluded when using
>> FlinkSQL that have to plan at the cluster level. e.g. Reduce the task
>> slots, increase the partitions, reduce the TM's (possibily making them
>> bigger) etc...
>>
>> Aeden
>>
>> On Sun, Mar 14, 2021 at 10:41 PM Xintong Song <to...@gmail.com> wrote:
>> >
>> > Hi Aeden,
>> >
>> > IIUC, the topic being read has 36 partitions means that your source task has a parallelism of 36. What's the parallelism of other tasks? Is the job taking use of all the 72 (18 TMs * 4 slots/TM) slots?
>> >
>> > I'm afraid currently there's no good way to guarantee subtasks of a task are spread out evenly.
>> >
>> > The configuration option you mentioned makes sure slots are allocated from TMs evenly, it does not affect how tasks are distributed over the allocated slots.
>> > E.g., say your job has two tasks A & B, with parallelism 36 & 54 respectively. That means, with the default slot sharing strategy, your job needs 54 slots in total to be executed. With the configuration enabled, it is guaranteed that for each TM 3 slots are occupied. For B (parallelism 54), there's a subtask deployed in each slot, thus 3 subtasks on each TM. As for A, there're only 36 slots containing a subtask of it, and there's no guarantee which 36 out of the 54 contain it.
>> >
>> > Thank you~
>> >
>> > Xintong Song
>> >
>> >
>> >
>> > On Mon, Mar 15, 2021 at 3:54 AM Chesnay Schepler <ch...@apache.org> wrote:
>> >>
>> >> Is this a brand-new job, with the cluster having all 18 TMs at the time
>> >> of submission? (or did you add more TMs while the job was running)
>> >>
>> >> On 3/12/2021 5:47 PM, Aeden Jameson wrote:
>> >> > Hi Matthias,
>> >> >
>> >> > Yes, all the task managers have the same hardware/memory configuration.
>> >> >
>> >> > Aeden
>> >> >
>> >> > On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl <ma...@ververica.com> wrote:
>> >> >> Hi Aeden,
>> >> >> just to be sure: All task managers have the same hardware/memory configuration, haven't they? I'm not 100% sure whether this affects the slot selection in the end, but it looks like this parameter has also an influence on the slot matching strategy preferring slots with less utilization of resources [1].
>> >> >>
>> >> >> I'm gonna add Chesnay to the thread. He might have more insights here. @Chesnay are there any other things that might affect the slot selection when actually trying to evenly spread out the slots?
>> >> >>
>> >> >> Matthias
>> >> >>
>> >> >> [1] https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
>> >> >>
>> >> >> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson <ae...@gmail.com> wrote:
>> >> >>> Hi Arvid,
>> >> >>>
>> >> >>>    Thanks for responding. I did check the configuration tab of the job
>> >> >>> manager and the setting cluster.evenly-spread-out-slots: true is
>> >> >>> there. However I'm still observing unevenness in the distribution of
>> >> >>> source tasks. Perhaps this additional information could shed light.
>> >> >>>
>> >> >>> Version: 1.12.1
>> >> >>> Deployment Mode: Application
>> >> >>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
>> >> >>> Flink operator https://github.com/lyft/flinkk8soperator
>> >> >>>
>> >> >>> I did place the setting under the flinkConfig section,
>> >> >>>
>> >> >>> apiVersion: flink.k8s.io/v1beta1
>> >> >>> ....
>> >> >>> spec:
>> >> >>>    flinkConfig:
>> >> >>>      cluster.evenly-spread-out-slots: true
>> >> >>>      high-availability: zookeeper
>> >> >>>      ...
>> >> >>>      state.backend: filesystem
>> >> >>>      ...
>> >> >>>    jobManagerConfig:
>> >> >>>      envConfig:
>> >> >>>          ....
>> >> >>>
>> >> >>> Would you explain how the setting ends up evenly distributing active
>> >> >>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
>> >> >>> TM3 ... TM18 in order and starting again. In my case I have 36
>> >> >>> partitions and 18 nodes so after the second pass in assignment I would
>> >> >>> end up with 2 subtasks in the consumer group on each TM. And then
>> >> >>> subsequent passes result in inactive consumers.
>> >> >>>
>> >> >>>
>> >> >>> Thank you,
>> >> >>> Aeden
>> >> >>>
>> >> >>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise <ar...@apache.org> wrote:
>> >> >>>> Hi Aeden,
>> >> >>>>
>> >> >>>> the option that you mentioned should have actually caused your desired behavior. Can you double-check that it's set for the job (you can look at the config in the Flink UI to be 100% sure).
>> >> >>>>
>> >> >>>> Another option is to simply give all task managers 2 slots. In that way, the scheduler can only evenly distribute.
>> >> >>>>
>> >> >>>> On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <ae...@gmail.com> wrote:
>> >> >>>>>      I have a cluster with 18 task managers 4 task slots each running a
>> >> >>>>> job whose source/sink(s) are declared with FlinkSQL using the Kafka
>> >> >>>>> connector. The topic being read has 36 partitions. The problem I'm
>> >> >>>>> observing is that the subtasks for the sources are not evenly
>> >> >>>>> distributed. For example, 1 task manager will have 4 active source
>> >> >>>>> subtasks and other TM's none. Is there a way to force  each task
>> >> >>>>> manager to have 2 active source subtasks.  I tried using the setting
>> >> >>>>> cluster.evenly-spread-out-slots: true , but that didn't have the
>> >> >>>>> desired effect.
>> >> >>>>>
>> >> >>>>> --
>> >> >>>>> Thank you,
>> >> >>>>> Aeden
>> >>
>> >>



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson
Blah Blah Blah: http://www.twitter.com/daliful

Re: Evenly Spreading Out Source Tasks

Posted by Xintong Song <to...@gmail.com>.
If all the tasks have the same parallelism 36, your job should only
allocate 36 slots. The evenly-spread-out-slots option should help in your
case.

Is it possible for you to share the complete jobmanager logs?


Thank you~

Xintong Song



On Tue, Mar 16, 2021 at 12:46 AM Aeden Jameson <ae...@gmail.com>
wrote:

> Hi Xintong,
>
>     Thanks for replying.  Yes, you understood my scenario. Every task
> has the same parallelism since we're using FlinkSql unless there is a
> way to change the parallelism of the source task that I have missed.
> Your explanation of the setting makes sense and is what I ended up
> concluding. Assuming one can't change the parallelism of FlinkSQL
> tasks other than the sink-parallelism option I've concluded when using
> FlinkSQL that have to plan at the cluster level. e.g. Reduce the task
> slots, increase the partitions, reduce the TM's (possibily making them
> bigger) etc...
>
> Aeden
>
> On Sun, Mar 14, 2021 at 10:41 PM Xintong Song <to...@gmail.com>
> wrote:
> >
> > Hi Aeden,
> >
> > IIUC, the topic being read has 36 partitions means that your source task
> has a parallelism of 36. What's the parallelism of other tasks? Is the job
> taking use of all the 72 (18 TMs * 4 slots/TM) slots?
> >
> > I'm afraid currently there's no good way to guarantee subtasks of a task
> are spread out evenly.
> >
> > The configuration option you mentioned makes sure slots are allocated
> from TMs evenly, it does not affect how tasks are distributed over the
> allocated slots.
> > E.g., say your job has two tasks A & B, with parallelism 36 & 54
> respectively. That means, with the default slot sharing strategy, your job
> needs 54 slots in total to be executed. With the configuration enabled, it
> is guaranteed that for each TM 3 slots are occupied. For B (parallelism
> 54), there's a subtask deployed in each slot, thus 3 subtasks on each TM.
> As for A, there're only 36 slots containing a subtask of it, and there's no
> guarantee which 36 out of the 54 contain it.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Mar 15, 2021 at 3:54 AM Chesnay Schepler <ch...@apache.org>
> wrote:
> >>
> >> Is this a brand-new job, with the cluster having all 18 TMs at the time
> >> of submission? (or did you add more TMs while the job was running)
> >>
> >> On 3/12/2021 5:47 PM, Aeden Jameson wrote:
> >> > Hi Matthias,
> >> >
> >> > Yes, all the task managers have the same hardware/memory
> configuration.
> >> >
> >> > Aeden
> >> >
> >> > On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl <ma...@ververica.com>
> wrote:
> >> >> Hi Aeden,
> >> >> just to be sure: All task managers have the same hardware/memory
> configuration, haven't they? I'm not 100% sure whether this affects the
> slot selection in the end, but it looks like this parameter has also an
> influence on the slot matching strategy preferring slots with less
> utilization of resources [1].
> >> >>
> >> >> I'm gonna add Chesnay to the thread. He might have more insights
> here. @Chesnay are there any other things that might affect the slot
> selection when actually trying to evenly spread out the slots?
> >> >>
> >> >> Matthias
> >> >>
> >> >> [1]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
> >> >>
> >> >> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson <
> aeden.jameson@gmail.com> wrote:
> >> >>> Hi Arvid,
> >> >>>
> >> >>>    Thanks for responding. I did check the configuration tab of the
> job
> >> >>> manager and the setting cluster.evenly-spread-out-slots: true is
> >> >>> there. However I'm still observing unevenness in the distribution of
> >> >>> source tasks. Perhaps this additional information could shed light.
> >> >>>
> >> >>> Version: 1.12.1
> >> >>> Deployment Mode: Application
> >> >>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
> >> >>> Flink operator https://github.com/lyft/flinkk8soperator
> >> >>>
> >> >>> I did place the setting under the flinkConfig section,
> >> >>>
> >> >>> apiVersion: flink.k8s.io/v1beta1
> >> >>> ....
> >> >>> spec:
> >> >>>    flinkConfig:
> >> >>>      cluster.evenly-spread-out-slots: true
> >> >>>      high-availability: zookeeper
> >> >>>      ...
> >> >>>      state.backend: filesystem
> >> >>>      ...
> >> >>>    jobManagerConfig:
> >> >>>      envConfig:
> >> >>>          ....
> >> >>>
> >> >>> Would you explain how the setting ends up evenly distributing active
> >> >>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
> >> >>> TM3 ... TM18 in order and starting again. In my case I have 36
> >> >>> partitions and 18 nodes so after the second pass in assignment I
> would
> >> >>> end up with 2 subtasks in the consumer group on each TM. And then
> >> >>> subsequent passes result in inactive consumers.
> >> >>>
> >> >>>
> >> >>> Thank you,
> >> >>> Aeden
> >> >>>
> >> >>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise <ar...@apache.org>
> wrote:
> >> >>>> Hi Aeden,
> >> >>>>
> >> >>>> the option that you mentioned should have actually caused your
> desired behavior. Can you double-check that it's set for the job (you can
> look at the config in the Flink UI to be 100% sure).
> >> >>>>
> >> >>>> Another option is to simply give all task managers 2 slots. In
> that way, the scheduler can only evenly distribute.
> >> >>>>
> >> >>>> On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <
> aeden.jameson@gmail.com> wrote:
> >> >>>>>      I have a cluster with 18 task managers 4 task slots each
> running a
> >> >>>>> job whose source/sink(s) are declared with FlinkSQL using the
> Kafka
> >> >>>>> connector. The topic being read has 36 partitions. The problem I'm
> >> >>>>> observing is that the subtasks for the sources are not evenly
> >> >>>>> distributed. For example, 1 task manager will have 4 active source
> >> >>>>> subtasks and other TM's none. Is there a way to force  each task
> >> >>>>> manager to have 2 active source subtasks.  I tried using the
> setting
> >> >>>>> cluster.evenly-spread-out-slots: true , but that didn't have the
> >> >>>>> desired effect.
> >> >>>>>
> >> >>>>> --
> >> >>>>> Thank you,
> >> >>>>> Aeden
> >>
> >>
>

Re: Evenly Spreading Out Source Tasks

Posted by Aeden Jameson <ae...@gmail.com>.
Hi Xintong,

    Thanks for replying.  Yes, you understood my scenario. Every task
has the same parallelism since we're using FlinkSql unless there is a
way to change the parallelism of the source task that I have missed.
Your explanation of the setting makes sense and is what I ended up
concluding. Assuming one can't change the parallelism of FlinkSQL
tasks other than the sink-parallelism option I've concluded when using
FlinkSQL that have to plan at the cluster level. e.g. Reduce the task
slots, increase the partitions, reduce the TM's (possibily making them
bigger) etc...

Aeden

On Sun, Mar 14, 2021 at 10:41 PM Xintong Song <to...@gmail.com> wrote:
>
> Hi Aeden,
>
> IIUC, the topic being read has 36 partitions means that your source task has a parallelism of 36. What's the parallelism of other tasks? Is the job taking use of all the 72 (18 TMs * 4 slots/TM) slots?
>
> I'm afraid currently there's no good way to guarantee subtasks of a task are spread out evenly.
>
> The configuration option you mentioned makes sure slots are allocated from TMs evenly, it does not affect how tasks are distributed over the allocated slots.
> E.g., say your job has two tasks A & B, with parallelism 36 & 54 respectively. That means, with the default slot sharing strategy, your job needs 54 slots in total to be executed. With the configuration enabled, it is guaranteed that for each TM 3 slots are occupied. For B (parallelism 54), there's a subtask deployed in each slot, thus 3 subtasks on each TM. As for A, there're only 36 slots containing a subtask of it, and there's no guarantee which 36 out of the 54 contain it.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Mar 15, 2021 at 3:54 AM Chesnay Schepler <ch...@apache.org> wrote:
>>
>> Is this a brand-new job, with the cluster having all 18 TMs at the time
>> of submission? (or did you add more TMs while the job was running)
>>
>> On 3/12/2021 5:47 PM, Aeden Jameson wrote:
>> > Hi Matthias,
>> >
>> > Yes, all the task managers have the same hardware/memory configuration.
>> >
>> > Aeden
>> >
>> > On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl <ma...@ververica.com> wrote:
>> >> Hi Aeden,
>> >> just to be sure: All task managers have the same hardware/memory configuration, haven't they? I'm not 100% sure whether this affects the slot selection in the end, but it looks like this parameter has also an influence on the slot matching strategy preferring slots with less utilization of resources [1].
>> >>
>> >> I'm gonna add Chesnay to the thread. He might have more insights here. @Chesnay are there any other things that might affect the slot selection when actually trying to evenly spread out the slots?
>> >>
>> >> Matthias
>> >>
>> >> [1] https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
>> >>
>> >> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson <ae...@gmail.com> wrote:
>> >>> Hi Arvid,
>> >>>
>> >>>    Thanks for responding. I did check the configuration tab of the job
>> >>> manager and the setting cluster.evenly-spread-out-slots: true is
>> >>> there. However I'm still observing unevenness in the distribution of
>> >>> source tasks. Perhaps this additional information could shed light.
>> >>>
>> >>> Version: 1.12.1
>> >>> Deployment Mode: Application
>> >>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
>> >>> Flink operator https://github.com/lyft/flinkk8soperator
>> >>>
>> >>> I did place the setting under the flinkConfig section,
>> >>>
>> >>> apiVersion: flink.k8s.io/v1beta1
>> >>> ....
>> >>> spec:
>> >>>    flinkConfig:
>> >>>      cluster.evenly-spread-out-slots: true
>> >>>      high-availability: zookeeper
>> >>>      ...
>> >>>      state.backend: filesystem
>> >>>      ...
>> >>>    jobManagerConfig:
>> >>>      envConfig:
>> >>>          ....
>> >>>
>> >>> Would you explain how the setting ends up evenly distributing active
>> >>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
>> >>> TM3 ... TM18 in order and starting again. In my case I have 36
>> >>> partitions and 18 nodes so after the second pass in assignment I would
>> >>> end up with 2 subtasks in the consumer group on each TM. And then
>> >>> subsequent passes result in inactive consumers.
>> >>>
>> >>>
>> >>> Thank you,
>> >>> Aeden
>> >>>
>> >>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise <ar...@apache.org> wrote:
>> >>>> Hi Aeden,
>> >>>>
>> >>>> the option that you mentioned should have actually caused your desired behavior. Can you double-check that it's set for the job (you can look at the config in the Flink UI to be 100% sure).
>> >>>>
>> >>>> Another option is to simply give all task managers 2 slots. In that way, the scheduler can only evenly distribute.
>> >>>>
>> >>>> On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <ae...@gmail.com> wrote:
>> >>>>>      I have a cluster with 18 task managers 4 task slots each running a
>> >>>>> job whose source/sink(s) are declared with FlinkSQL using the Kafka
>> >>>>> connector. The topic being read has 36 partitions. The problem I'm
>> >>>>> observing is that the subtasks for the sources are not evenly
>> >>>>> distributed. For example, 1 task manager will have 4 active source
>> >>>>> subtasks and other TM's none. Is there a way to force  each task
>> >>>>> manager to have 2 active source subtasks.  I tried using the setting
>> >>>>> cluster.evenly-spread-out-slots: true , but that didn't have the
>> >>>>> desired effect.
>> >>>>>
>> >>>>> --
>> >>>>> Thank you,
>> >>>>> Aeden
>>
>>

Re: Evenly Spreading Out Source Tasks

Posted by Xintong Song <to...@gmail.com>.
Hi Aeden,

IIUC, the topic being read has 36 partitions means that your source task
has a parallelism of 36. What's the parallelism of other tasks? Is the job
taking use of all the 72 (18 TMs * 4 slots/TM) slots?

I'm afraid currently there's no good way to guarantee subtasks of a task
are spread out evenly.

The configuration option you mentioned makes sure slots are allocated from
TMs evenly, it does not affect how tasks are distributed over the allocated
slots.
E.g., say your job has two tasks A & B, with parallelism 36 & 54
respectively. That means, with the default slot sharing strategy, your job
needs 54 slots in total to be executed. With the configuration enabled, it
is guaranteed that for each TM 3 slots are occupied. For B (parallelism
54), there's a subtask deployed in each slot, thus 3 subtasks on each TM.
As for A, there're only 36 slots containing a subtask of it, and there's no
guarantee which 36 out of the 54 contain it.

Thank you~

Xintong Song



On Mon, Mar 15, 2021 at 3:54 AM Chesnay Schepler <ch...@apache.org> wrote:

> Is this a brand-new job, with the cluster having all 18 TMs at the time
> of submission? (or did you add more TMs while the job was running)
>
> On 3/12/2021 5:47 PM, Aeden Jameson wrote:
> > Hi Matthias,
> >
> > Yes, all the task managers have the same hardware/memory configuration.
> >
> > Aeden
> >
> > On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl <ma...@ververica.com>
> wrote:
> >> Hi Aeden,
> >> just to be sure: All task managers have the same hardware/memory
> configuration, haven't they? I'm not 100% sure whether this affects the
> slot selection in the end, but it looks like this parameter has also an
> influence on the slot matching strategy preferring slots with less
> utilization of resources [1].
> >>
> >> I'm gonna add Chesnay to the thread. He might have more insights here.
> @Chesnay are there any other things that might affect the slot selection
> when actually trying to evenly spread out the slots?
> >>
> >> Matthias
> >>
> >> [1]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
> >>
> >> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson <ae...@gmail.com>
> wrote:
> >>> Hi Arvid,
> >>>
> >>>    Thanks for responding. I did check the configuration tab of the job
> >>> manager and the setting cluster.evenly-spread-out-slots: true is
> >>> there. However I'm still observing unevenness in the distribution of
> >>> source tasks. Perhaps this additional information could shed light.
> >>>
> >>> Version: 1.12.1
> >>> Deployment Mode: Application
> >>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
> >>> Flink operator https://github.com/lyft/flinkk8soperator
> >>>
> >>> I did place the setting under the flinkConfig section,
> >>>
> >>> apiVersion: flink.k8s.io/v1beta1
> >>> ....
> >>> spec:
> >>>    flinkConfig:
> >>>      cluster.evenly-spread-out-slots: true
> >>>      high-availability: zookeeper
> >>>      ...
> >>>      state.backend: filesystem
> >>>      ...
> >>>    jobManagerConfig:
> >>>      envConfig:
> >>>          ....
> >>>
> >>> Would you explain how the setting ends up evenly distributing active
> >>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
> >>> TM3 ... TM18 in order and starting again. In my case I have 36
> >>> partitions and 18 nodes so after the second pass in assignment I would
> >>> end up with 2 subtasks in the consumer group on each TM. And then
> >>> subsequent passes result in inactive consumers.
> >>>
> >>>
> >>> Thank you,
> >>> Aeden
> >>>
> >>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise <ar...@apache.org> wrote:
> >>>> Hi Aeden,
> >>>>
> >>>> the option that you mentioned should have actually caused your
> desired behavior. Can you double-check that it's set for the job (you can
> look at the config in the Flink UI to be 100% sure).
> >>>>
> >>>> Another option is to simply give all task managers 2 slots. In that
> way, the scheduler can only evenly distribute.
> >>>>
> >>>> On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <
> aeden.jameson@gmail.com> wrote:
> >>>>>      I have a cluster with 18 task managers 4 task slots each
> running a
> >>>>> job whose source/sink(s) are declared with FlinkSQL using the Kafka
> >>>>> connector. The topic being read has 36 partitions. The problem I'm
> >>>>> observing is that the subtasks for the sources are not evenly
> >>>>> distributed. For example, 1 task manager will have 4 active source
> >>>>> subtasks and other TM's none. Is there a way to force  each task
> >>>>> manager to have 2 active source subtasks.  I tried using the setting
> >>>>> cluster.evenly-spread-out-slots: true , but that didn't have the
> >>>>> desired effect.
> >>>>>
> >>>>> --
> >>>>> Thank you,
> >>>>> Aeden
>
>
>

Re: Evenly Spreading Out Source Tasks

Posted by Chesnay Schepler <ch...@apache.org>.
Is this a brand-new job, with the cluster having all 18 TMs at the time 
of submission? (or did you add more TMs while the job was running)

On 3/12/2021 5:47 PM, Aeden Jameson wrote:
> Hi Matthias,
>
> Yes, all the task managers have the same hardware/memory configuration.
>
> Aeden
>
> On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl <ma...@ververica.com> wrote:
>> Hi Aeden,
>> just to be sure: All task managers have the same hardware/memory configuration, haven't they? I'm not 100% sure whether this affects the slot selection in the end, but it looks like this parameter has also an influence on the slot matching strategy preferring slots with less utilization of resources [1].
>>
>> I'm gonna add Chesnay to the thread. He might have more insights here. @Chesnay are there any other things that might affect the slot selection when actually trying to evenly spread out the slots?
>>
>> Matthias
>>
>> [1] https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
>>
>> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson <ae...@gmail.com> wrote:
>>> Hi Arvid,
>>>
>>>    Thanks for responding. I did check the configuration tab of the job
>>> manager and the setting cluster.evenly-spread-out-slots: true is
>>> there. However I'm still observing unevenness in the distribution of
>>> source tasks. Perhaps this additional information could shed light.
>>>
>>> Version: 1.12.1
>>> Deployment Mode: Application
>>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
>>> Flink operator https://github.com/lyft/flinkk8soperator
>>>
>>> I did place the setting under the flinkConfig section,
>>>
>>> apiVersion: flink.k8s.io/v1beta1
>>> ....
>>> spec:
>>>    flinkConfig:
>>>      cluster.evenly-spread-out-slots: true
>>>      high-availability: zookeeper
>>>      ...
>>>      state.backend: filesystem
>>>      ...
>>>    jobManagerConfig:
>>>      envConfig:
>>>          ....
>>>
>>> Would you explain how the setting ends up evenly distributing active
>>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
>>> TM3 ... TM18 in order and starting again. In my case I have 36
>>> partitions and 18 nodes so after the second pass in assignment I would
>>> end up with 2 subtasks in the consumer group on each TM. And then
>>> subsequent passes result in inactive consumers.
>>>
>>>
>>> Thank you,
>>> Aeden
>>>
>>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise <ar...@apache.org> wrote:
>>>> Hi Aeden,
>>>>
>>>> the option that you mentioned should have actually caused your desired behavior. Can you double-check that it's set for the job (you can look at the config in the Flink UI to be 100% sure).
>>>>
>>>> Another option is to simply give all task managers 2 slots. In that way, the scheduler can only evenly distribute.
>>>>
>>>> On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <ae...@gmail.com> wrote:
>>>>>      I have a cluster with 18 task managers 4 task slots each running a
>>>>> job whose source/sink(s) are declared with FlinkSQL using the Kafka
>>>>> connector. The topic being read has 36 partitions. The problem I'm
>>>>> observing is that the subtasks for the sources are not evenly
>>>>> distributed. For example, 1 task manager will have 4 active source
>>>>> subtasks and other TM's none. Is there a way to force  each task
>>>>> manager to have 2 active source subtasks.  I tried using the setting
>>>>> cluster.evenly-spread-out-slots: true , but that didn't have the
>>>>> desired effect.
>>>>>
>>>>> --
>>>>> Thank you,
>>>>> Aeden



Re: Evenly Spreading Out Source Tasks

Posted by Aeden Jameson <ae...@gmail.com>.
Hi Matthias,

Yes, all the task managers have the same hardware/memory configuration.

Aeden

On Fri, Mar 12, 2021 at 3:25 AM Matthias Pohl <ma...@ververica.com> wrote:
>
> Hi Aeden,
> just to be sure: All task managers have the same hardware/memory configuration, haven't they? I'm not 100% sure whether this affects the slot selection in the end, but it looks like this parameter has also an influence on the slot matching strategy preferring slots with less utilization of resources [1].
>
> I'm gonna add Chesnay to the thread. He might have more insights here. @Chesnay are there any other things that might affect the slot selection when actually trying to evenly spread out the slots?
>
> Matthias
>
> [1] https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141
>
> On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson <ae...@gmail.com> wrote:
>>
>> Hi Arvid,
>>
>>   Thanks for responding. I did check the configuration tab of the job
>> manager and the setting cluster.evenly-spread-out-slots: true is
>> there. However I'm still observing unevenness in the distribution of
>> source tasks. Perhaps this additional information could shed light.
>>
>> Version: 1.12.1
>> Deployment Mode: Application
>> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
>> Flink operator https://github.com/lyft/flinkk8soperator
>>
>> I did place the setting under the flinkConfig section,
>>
>> apiVersion: flink.k8s.io/v1beta1
>> ....
>> spec:
>>   flinkConfig:
>>     cluster.evenly-spread-out-slots: true
>>     high-availability: zookeeper
>>     ...
>>     state.backend: filesystem
>>     ...
>>   jobManagerConfig:
>>     envConfig:
>>         ....
>>
>> Would you explain how the setting ends up evenly distributing active
>> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
>> TM3 ... TM18 in order and starting again. In my case I have 36
>> partitions and 18 nodes so after the second pass in assignment I would
>> end up with 2 subtasks in the consumer group on each TM. And then
>> subsequent passes result in inactive consumers.
>>
>>
>> Thank you,
>> Aeden
>>
>> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise <ar...@apache.org> wrote:
>> >
>> > Hi Aeden,
>> >
>> > the option that you mentioned should have actually caused your desired behavior. Can you double-check that it's set for the job (you can look at the config in the Flink UI to be 100% sure).
>> >
>> > Another option is to simply give all task managers 2 slots. In that way, the scheduler can only evenly distribute.
>> >
>> > On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <ae...@gmail.com> wrote:
>> >>
>> >>     I have a cluster with 18 task managers 4 task slots each running a
>> >> job whose source/sink(s) are declared with FlinkSQL using the Kafka
>> >> connector. The topic being read has 36 partitions. The problem I'm
>> >> observing is that the subtasks for the sources are not evenly
>> >> distributed. For example, 1 task manager will have 4 active source
>> >> subtasks and other TM's none. Is there a way to force  each task
>> >> manager to have 2 active source subtasks.  I tried using the setting
>> >> cluster.evenly-spread-out-slots: true , but that didn't have the
>> >> desired effect.
>> >>
>> >> --
>> >> Thank you,
>> >> Aeden

Re: Evenly Spreading Out Source Tasks

Posted by Matthias Pohl <ma...@ververica.com>.
Hi Aeden,
just to be sure: All task managers have the same hardware/memory
configuration, haven't they? I'm not 100% sure whether this affects the
slot selection in the end, but it looks like this parameter has also an
influence on the slot matching strategy preferring slots with less
utilization of resources [1].

I'm gonna add Chesnay to the thread. He might have more insights here.
@Chesnay are there any other things that might affect the slot selection
when actually trying to evenly spread out the slots?

Matthias

[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java#L141

On Fri, Mar 12, 2021 at 12:58 AM Aeden Jameson <ae...@gmail.com>
wrote:

> Hi Arvid,
>
>   Thanks for responding. I did check the configuration tab of the job
> manager and the setting cluster.evenly-spread-out-slots: true is
> there. However I'm still observing unevenness in the distribution of
> source tasks. Perhaps this additional information could shed light.
>
> Version: 1.12.1
> Deployment Mode: Application
> Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
> Flink operator https://github.com/lyft/flinkk8soperator
>
> I did place the setting under the flinkConfig section,
>
> apiVersion: flink.k8s.io/v1beta1
> ....
> spec:
>   flinkConfig:
>     cluster.evenly-spread-out-slots: true
>     high-availability: zookeeper
>     ...
>     state.backend: filesystem
>     ...
>   jobManagerConfig:
>     envConfig:
>         ....
>
> Would you explain how the setting ends up evenly distributing active
> kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
> TM3 ... TM18 in order and starting again. In my case I have 36
> partitions and 18 nodes so after the second pass in assignment I would
> end up with 2 subtasks in the consumer group on each TM. And then
> subsequent passes result in inactive consumers.
>
>
> Thank you,
> Aeden
>
> On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise <ar...@apache.org> wrote:
> >
> > Hi Aeden,
> >
> > the option that you mentioned should have actually caused your desired
> behavior. Can you double-check that it's set for the job (you can look at
> the config in the Flink UI to be 100% sure).
> >
> > Another option is to simply give all task managers 2 slots. In that way,
> the scheduler can only evenly distribute.
> >
> > On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <ae...@gmail.com>
> wrote:
> >>
> >>     I have a cluster with 18 task managers 4 task slots each running a
> >> job whose source/sink(s) are declared with FlinkSQL using the Kafka
> >> connector. The topic being read has 36 partitions. The problem I'm
> >> observing is that the subtasks for the sources are not evenly
> >> distributed. For example, 1 task manager will have 4 active source
> >> subtasks and other TM's none. Is there a way to force  each task
> >> manager to have 2 active source subtasks.  I tried using the setting
> >> cluster.evenly-spread-out-slots: true , but that didn't have the
> >> desired effect.
> >>
> >> --
> >> Thank you,
> >> Aeden

Re: Evenly Spreading Out Source Tasks

Posted by Aeden Jameson <ae...@gmail.com>.
Hi Arvid,

  Thanks for responding. I did check the configuration tab of the job
manager and the setting cluster.evenly-spread-out-slots: true is
there. However I'm still observing unevenness in the distribution of
source tasks. Perhaps this additional information could shed light.

Version: 1.12.1
Deployment Mode: Application
Deployment Type: Standalone,  Docker on Kubernetes using the Lyft
Flink operator https://github.com/lyft/flinkk8soperator

I did place the setting under the flinkConfig section,

apiVersion: flink.k8s.io/v1beta1
....
spec:
  flinkConfig:
    cluster.evenly-spread-out-slots: true
    high-availability: zookeeper
    ...
    state.backend: filesystem
    ...
  jobManagerConfig:
    envConfig:
        ....

Would you explain how the setting ends up evenly distributing active
kafka consumers? Is it a result of just assigning tasks toTM1, TM2,
TM3 ... TM18 in order and starting again. In my case I have 36
partitions and 18 nodes so after the second pass in assignment I would
end up with 2 subtasks in the consumer group on each TM. And then
subsequent passes result in inactive consumers.


Thank you,
Aeden

On Thu, Mar 11, 2021 at 5:26 AM Arvid Heise <ar...@apache.org> wrote:
>
> Hi Aeden,
>
> the option that you mentioned should have actually caused your desired behavior. Can you double-check that it's set for the job (you can look at the config in the Flink UI to be 100% sure).
>
> Another option is to simply give all task managers 2 slots. In that way, the scheduler can only evenly distribute.
>
> On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <ae...@gmail.com> wrote:
>>
>>     I have a cluster with 18 task managers 4 task slots each running a
>> job whose source/sink(s) are declared with FlinkSQL using the Kafka
>> connector. The topic being read has 36 partitions. The problem I'm
>> observing is that the subtasks for the sources are not evenly
>> distributed. For example, 1 task manager will have 4 active source
>> subtasks and other TM's none. Is there a way to force  each task
>> manager to have 2 active source subtasks.  I tried using the setting
>> cluster.evenly-spread-out-slots: true , but that didn't have the
>> desired effect.
>>
>> --
>> Thank you,
>> Aeden

Re: Evenly Spreading Out Source Tasks

Posted by Arvid Heise <ar...@apache.org>.
Hi Aeden,

the option that you mentioned should have actually caused your desired
behavior. Can you double-check that it's set for the job (you can look at
the config in the Flink UI to be 100% sure).

Another option is to simply give all task managers 2 slots. In that way,
the scheduler can only evenly distribute.

On Wed, Mar 10, 2021 at 7:21 PM Aeden Jameson <ae...@gmail.com>
wrote:

>     I have a cluster with 18 task managers 4 task slots each running a
> job whose source/sink(s) are declared with FlinkSQL using the Kafka
> connector. The topic being read has 36 partitions. The problem I'm
> observing is that the subtasks for the sources are not evenly
> distributed. For example, 1 task manager will have 4 active source
> subtasks and other TM's none. Is there a way to force  each task
> manager to have 2 active source subtasks.  I tried using the setting
> cluster.evenly-spread-out-slots: true , but that didn't have the
> desired effect.
>
> --
> Thank you,
> Aeden
>