You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "penguin." <bx...@126.com> on 2021/01/06 05:57:44 UTC

Task scheduling of Flink

Hello! Do you know how to modify the task scheduling method of Flink?

Re:Re: Re: Task scheduling of Flink

Posted by "penguin." <bx...@126.com>.
Hi Lasantha,


Thanks for your reply.I also found the method call chain you said.
Next, I'm going to study these codes to see if I can modify them to schedule tasks to a slot in the specified taskManager.
But I think it's a very difficult thing for me to read the source code, especially in the absence of comments.
It would be a great pleasure if more people come to study and discuss this topic.




Best,
Penguin













At 2021-01-08 06:58:54, "Lasantha Fernando" <la...@gmail.com> wrote:
>Hi Penguin,
>
>Jumping into this conversation since I worked on the same code base
>(Flink-1.11.2) for a recent project and might have a more fresh memory of
>the method calls.
>
>I believe the chain of methods you have highlighted is correct. Obviously,
>the chain would differ based on the configuration and context. This is a
>stack trace that I derived for a scheduling call for a simple Word Count
>streaming application.
>
>selectBestSlotForProfile:46, LocationPreferenceSlotSelectionStrategy
>(org.apache.flink.runtime.jobmaster.slotpool)
>tryAllocateFromAvailable:275, SchedulerImpl
>(org.apache.flink.runtime.jobmaster.slotpool)
>allocateMultiTaskSlot:470, SchedulerImpl
>(org.apache.flink.runtime.jobmaster.slotpool)
>allocateSharedSlot:311, SchedulerImpl
>(org.apache.flink.runtime.jobmaster.slotpool)
>internalAllocateSlot:160, SchedulerImpl
>(org.apache.flink.runtime.jobmaster.slotpool)
>allocateSlotInternal:143, SchedulerImpl
>(org.apache.flink.runtime.jobmaster.slotpool)
>allocateSlot:113, SchedulerImpl
>(org.apache.flink.runtime.jobmaster.slotpool)
>allocateSlot:115, SlotProviderStrategy$NormalSlotProviderStrategy
>(org.apache.flink.runtime.executiongraph)
>lambda$allocateSlotsFor$0:104, DefaultExecutionSlotAllocator
>(org.apache.flink.runtime.scheduler)
>apply:-1, 90274328
>(org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator$$Lambda$679)
>uniComposeStage:995, CompletableFuture (java.util.concurrent)
>thenCompose:2137, CompletableFuture (java.util.concurrent)
>allocateSlotsFor:102, DefaultExecutionSlotAllocator
>(org.apache.flink.runtime.scheduler)
>allocateSlots:339, DefaultScheduler (org.apache.flink.runtime.scheduler)
>allocateSlotsAndDeploy:312, DefaultScheduler
>(org.apache.flink.runtime.scheduler)
>allocateSlotsAndDeploy:76, EagerSchedulingStrategy
>(org.apache.flink.runtime.scheduler.strategy)
>startScheduling:52, EagerSchedulingStrategy
>(org.apache.flink.runtime.scheduler.strategy)
>startSchedulingInternal:173, DefaultScheduler
>(org.apache.flink.runtime.scheduler)
>startScheduling:461, SchedulerBase (org.apache.flink.runtime.scheduler)
>startScheduling:897, JobMaster (org.apache.flink.runtime.jobmaster)
>
>It might be useful to remote debug a local setup to get the exact stack
>trace/method-call-chain for your scenario. Navigating the code through
>method calls to figure out the execution sequence can be a bit tricky since
>Flink uses a lot of asynchronous calls.
>
>According to my understanding, the basic flow for scheduling goes like
>this: It initially calls DefaultScheduler#startScheduling, and goes through
>an internal call to the preset scheduling strategy. The scheduling strategy
>is set at the time of creation of the DefaultScheduler class. The
>scheduling strategy basically populates some deployment option
>configurations (though it can be used to do more fine-grained scheduling
>changes), then tells the DefaultScheduler to allocate the slots and deploy.
>It then goes through another set of calls that basically allocates slots,
>selects a slot for each execution vertex (while also considering slot
>selection strategies).
>
>This information is then sent to the TaskManager through an RPC call
>(through another set chain of calls), and the task information is used by
>the task manager to deploy the tasks according to the schedule.
>
>Hope this helps. Also hope someone from the community will correct if I
>have stated something incorrect.
>
>Best,
>
>Lasantha
>
>On Wed, 6 Jan 2021 at 21:01, penguin. <bx...@126.com> wrote:
>
>> Hi Till,
>>
>> Thank you for your reply. I found such a chain of method calls:
>>
>>
>> JobMaster#startScheduling -> SchedulerBase#startScheduling ->
>> DefaultScheduler#startSchedulingInternal ->
>> EagerSchedulingStrategy#startScheduling ->
>> EagerSchedulingStrategy#allocateSlotsAndDeploy ->
>> DefaultScheduler#allocateSlotsAndDeploy .
>> (The version of Flink I use is Flink-1.11.1)
>>
>>
>> I'm going to try to see the following code first, but the code annotation
>> seem to be very few.I feel very difficult for my goal and I hope I can get
>> your help later.
>>
>>
>> Sincerely,
>> Penguin
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2021-01-06 17:08:05, "Till Rohrmann" <tr...@apache.org> wrote:
>> >Hi Penguin,
>> >
>> >What do you wanna do? If you want to change Flink's scheduling behaviour,
>> >then you can take a look at the implementations of SchedulerNG.
>> >
>> >Cheers,
>> >Till
>> >
>> >On Wed, Jan 6, 2021 at 6:58 AM penguin. <bx...@126.com> wrote:
>> >
>> >> Hello! Do you know how to modify the task scheduling method of Flink?
>>

Re: Re: Task scheduling of Flink

Posted by Lasantha Fernando <la...@gmail.com>.
Hi Penguin,

Jumping into this conversation since I worked on the same code base
(Flink-1.11.2) for a recent project and might have a more fresh memory of
the method calls.

I believe the chain of methods you have highlighted is correct. Obviously,
the chain would differ based on the configuration and context. This is a
stack trace that I derived for a scheduling call for a simple Word Count
streaming application.

selectBestSlotForProfile:46, LocationPreferenceSlotSelectionStrategy
(org.apache.flink.runtime.jobmaster.slotpool)
tryAllocateFromAvailable:275, SchedulerImpl
(org.apache.flink.runtime.jobmaster.slotpool)
allocateMultiTaskSlot:470, SchedulerImpl
(org.apache.flink.runtime.jobmaster.slotpool)
allocateSharedSlot:311, SchedulerImpl
(org.apache.flink.runtime.jobmaster.slotpool)
internalAllocateSlot:160, SchedulerImpl
(org.apache.flink.runtime.jobmaster.slotpool)
allocateSlotInternal:143, SchedulerImpl
(org.apache.flink.runtime.jobmaster.slotpool)
allocateSlot:113, SchedulerImpl
(org.apache.flink.runtime.jobmaster.slotpool)
allocateSlot:115, SlotProviderStrategy$NormalSlotProviderStrategy
(org.apache.flink.runtime.executiongraph)
lambda$allocateSlotsFor$0:104, DefaultExecutionSlotAllocator
(org.apache.flink.runtime.scheduler)
apply:-1, 90274328
(org.apache.flink.runtime.scheduler.DefaultExecutionSlotAllocator$$Lambda$679)
uniComposeStage:995, CompletableFuture (java.util.concurrent)
thenCompose:2137, CompletableFuture (java.util.concurrent)
allocateSlotsFor:102, DefaultExecutionSlotAllocator
(org.apache.flink.runtime.scheduler)
allocateSlots:339, DefaultScheduler (org.apache.flink.runtime.scheduler)
allocateSlotsAndDeploy:312, DefaultScheduler
(org.apache.flink.runtime.scheduler)
allocateSlotsAndDeploy:76, EagerSchedulingStrategy
(org.apache.flink.runtime.scheduler.strategy)
startScheduling:52, EagerSchedulingStrategy
(org.apache.flink.runtime.scheduler.strategy)
startSchedulingInternal:173, DefaultScheduler
(org.apache.flink.runtime.scheduler)
startScheduling:461, SchedulerBase (org.apache.flink.runtime.scheduler)
startScheduling:897, JobMaster (org.apache.flink.runtime.jobmaster)

It might be useful to remote debug a local setup to get the exact stack
trace/method-call-chain for your scenario. Navigating the code through
method calls to figure out the execution sequence can be a bit tricky since
Flink uses a lot of asynchronous calls.

According to my understanding, the basic flow for scheduling goes like
this: It initially calls DefaultScheduler#startScheduling, and goes through
an internal call to the preset scheduling strategy. The scheduling strategy
is set at the time of creation of the DefaultScheduler class. The
scheduling strategy basically populates some deployment option
configurations (though it can be used to do more fine-grained scheduling
changes), then tells the DefaultScheduler to allocate the slots and deploy.
It then goes through another set of calls that basically allocates slots,
selects a slot for each execution vertex (while also considering slot
selection strategies).

This information is then sent to the TaskManager through an RPC call
(through another set chain of calls), and the task information is used by
the task manager to deploy the tasks according to the schedule.

Hope this helps. Also hope someone from the community will correct if I
have stated something incorrect.

Best,

Lasantha

On Wed, 6 Jan 2021 at 21:01, penguin. <bx...@126.com> wrote:

> Hi Till,
>
> Thank you for your reply. I found such a chain of method calls:
>
>
> JobMaster#startScheduling -> SchedulerBase#startScheduling ->
> DefaultScheduler#startSchedulingInternal ->
> EagerSchedulingStrategy#startScheduling ->
> EagerSchedulingStrategy#allocateSlotsAndDeploy ->
> DefaultScheduler#allocateSlotsAndDeploy .
> (The version of Flink I use is Flink-1.11.1)
>
>
> I'm going to try to see the following code first, but the code annotation
> seem to be very few.I feel very difficult for my goal and I hope I can get
> your help later.
>
>
> Sincerely,
> Penguin
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2021-01-06 17:08:05, "Till Rohrmann" <tr...@apache.org> wrote:
> >Hi Penguin,
> >
> >What do you wanna do? If you want to change Flink's scheduling behaviour,
> >then you can take a look at the implementations of SchedulerNG.
> >
> >Cheers,
> >Till
> >
> >On Wed, Jan 6, 2021 at 6:58 AM penguin. <bx...@126.com> wrote:
> >
> >> Hello! Do you know how to modify the task scheduling method of Flink?
>

Re:Re: Task scheduling of Flink

Posted by "penguin." <bx...@126.com>.
Hi Till,

Thank you for your reply. I found such a chain of method calls: 


JobMaster#startScheduling -> SchedulerBase#startScheduling -> DefaultScheduler#startSchedulingInternal ->
EagerSchedulingStrategy#startScheduling -> EagerSchedulingStrategy#allocateSlotsAndDeploy -> DefaultScheduler#allocateSlotsAndDeploy .
(The version of Flink I use is Flink-1.11.1)


I'm going to try to see the following code first, but the code annotation seem to be very few.I feel very difficult for my goal and I hope I can get your help later.


Sincerely,
Penguin

















At 2021-01-06 17:08:05, "Till Rohrmann" <tr...@apache.org> wrote:
>Hi Penguin,
>
>What do you wanna do? If you want to change Flink's scheduling behaviour,
>then you can take a look at the implementations of SchedulerNG.
>
>Cheers,
>Till
>
>On Wed, Jan 6, 2021 at 6:58 AM penguin. <bx...@126.com> wrote:
>
>> Hello! Do you know how to modify the task scheduling method of Flink?

Re: Task scheduling of Flink

Posted by Till Rohrmann <tr...@apache.org>.
Hi Penguin,

What do you wanna do? If you want to change Flink's scheduling behaviour,
then you can take a look at the implementations of SchedulerNG.

Cheers,
Till

On Wed, Jan 6, 2021 at 6:58 AM penguin. <bx...@126.com> wrote:

> Hello! Do you know how to modify the task scheduling method of Flink?