You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2020/09/03 16:30:45 UTC

Re: [DISCUSS] FLIP-138: Declarative Resource management

Thanks for the feedback Xintong and Zhu Zhu. I've added a bit more details
for the intended interface extensions, potential follow ups (removing the
AllocationIDs) and the question about whether to reuse or return a slot if
the profiles don't fully match.

If nobody objects, then I would start a vote for this FLIP soon.

Cheers,
Till

On Mon, Aug 31, 2020 at 11:53 AM Zhu Zhu <re...@gmail.com> wrote:

> Thanks for the clarification @Till Rohrmann <tr...@apache.org>
>
> >> # Implications for the scheduling
> Agreed that it turned out to be different execution strategies for batch
> jobs.
> We can have a simple one first and improve it later.
>
> Thanks,
> Zhu
>
> Xintong Song <to...@gmail.com> 于2020年8月31日周一 下午3:05写道:
>
>> Thanks for the clarification, @Till.
>>
>> - For FLIP-56, sounds good to me. I think there should be no problem
>> before
>> removing AllocationID. And even after replacing AllocationID, it should
>> only require limited effort to make FLIP-56 work with SlotID. I was just
>> trying to understand when the effort will be needed.
>>
>> - For offer/release slots between JM/TM, I think you are right.
>> Waiting on the confirmation for resource requirement decrease before
>> freeing the slot is quite equivalent to releasing slots through RM, in
>> terms of it practically preventing JM from releasing slots when the RM is
>> absent. But this approach obviously requires less change to the current
>> mechanism.
>> Since the first problem can be solved by the declarative protocol, and the
>> second problem can be addressed by this confirmation based approach, ATM I
>> don't see any strong reason for changing to offering and releasing slots
>> through RM, especially considering the significant changes it requires.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>> > Thanks for creating this FLIP @Chesnay and the good input @Xintong and
>> @Zhu
>> > Zhu.
>> >
>> > Let me try to add some comments concerning your questions:
>> >
>> > # FLIP-56
>> >
>> > I think there is nothing fundamentally contradicting FLIP-56 in the FLIP
>> > for declarative resource management. As Chesnay said, we have to keep
>> the
>> > AllocationID around as long as we have the old scheduler implementation.
>> > Once it is replaced, we can think about using the SlotID instead of
>> > AllocationIDs for identifying allocated slots. For dynamic slots we can
>> > keep the special meaning of a SlotID with a negative index. In the
>> future
>> > we might think about making this encoding a bit more explicit by
>> sending a
>> > richer slot request object and reporting the actual SlotID back to the
>> RM.
>> >
>> > For the question of resource utilization vs. deployment latency I
>> believe
>> > that this will be a question of requirements and preferences as you've
>> said
>> > Xintong. I can see that we will have different strategies to fulfill the
>> > different needs.
>> >
>> > # Offer/free slots between JM/TM
>> >
>> > You are right Xintong that the existing slot protocol was developed with
>> > the assumption in mind that the RM and JM can run in separate processes
>> and
>> > that a failure of the RM should only affect the JM in the sense that it
>> > cannot ask for more resources. I believe that one could simplify things
>> a
>> > bit under the assumption that the RM and JM are always colocated in the
>> > same process. However, the discussion whether to change it or not should
>> > indeed be a separate one.
>> >
>> > Changing the slot protocol to a declarative resource management should
>> > already solve the first problem you have described because we won't ask
>> for
>> > new slots in case of a failover but simply keep the same resource
>> > requirements declared and let the RM make sure that we will receive at
>> > least this amount of slots.
>> >
>> > If releasing a slot should lead to allocating new resources because
>> > decreasing the resource requirement declaration takes longer than
>> releasing
>> > the slot on the TM, then we could apply what Chesnay said. By waiting on
>> > the confirmation of the resource requirement decrease and then freeing
>> the
>> > slot on the TM gives you effectively the same behaviour as if the
>> freeing
>> > of the slot would be done by the RM.
>> >
>> > I am not entirely sure whether allocating the slots and receiving the
>> slot
>> > offers through the RM will allow us to get rid of the pending slot
>> state on
>> > the RM side. If the RM needs to communicate with the TM and we want to
>> have
>> > a reconciliation protocol between these components, then I think we
>> would
>> > have to solve the exact same problem of currently waiting on the TM for
>> > confirming that a slot has been allocated.
>> >
>> > # Implications for the scheduling
>> >
>> > The FLIP does not fully cover the changes for the scheduler and mainly
>> > drafts the rough idea. For the batch scheduling, I believe that we have
>> a
>> > couple degrees of freedom in how to do things. In the scenario you
>> > described, one could choose a simple strategy where we wait for all
>> > producers to stop before deciding on the parallelism of the consumer and
>> > scheduling the respective tasks (even though they have POINTWISE
>> BLOCKING
>> > edges). Or we can try to be smart and say if we get at least one slot
>> that
>> > we can run the consumers with the same parallelism as the producers it
>> just
>> > might be that we have to run them one after another in a single slot.
>> One
>> > advantage of not directly schedule the first consumer when the first
>> > producer is finished is that one might schedule the consumer stage with
>> a
>> > higher parallelism because one might acquire more resources a bit later.
>> > But I would see this as different execution strategies which have
>> different
>> > properties.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <re...@gmail.com> wrote:
>> >
>> > > Thanks for the explanation @Chesnay Schepler <ch...@apache.org> .
>> > >
>> > > Yes, for batch jobs it can be safe to schedule downstream vertices if
>> > > there
>> > > are enough slots in the pool, even if these slots are still in use at
>> > that
>> > > moment.
>> > > And the job can still progress even if the vertices stick to the
>> original
>> > > parallelism.
>> > >
>> > > Looks to me several decision makings can be different for streaming
>> and
>> > > batch jobs.
>> > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph
>> > > construction!
>> > >
>> > > Thanks,
>> > > Zhu
>> > >
>> > > Chesnay Schepler <ch...@apache.org> 于2020年8月28日周五 下午4:35写道:
>> > >
>> > >> Maybe :)
>> > >>
>> > >> Imagine a case where the producer and consumer have the same
>> > >> ResourceProfile, or at least one where the consumer requirements are
>> > less
>> > >> than the producer ones.
>> > >> In this case, the scheduler can happily schedule consumers, because
>> it
>> > >> knows it will get enough slots.
>> > >>
>> > >> If the profiles are different, then the Scheduler _may_ wait
>> > >> numberOf(producer) slots; it _may_ also stick with the parallelism
>> and
>> > >> schedule right away, in the worst case running the consumers in
>> > sequence.
>> > >> In fact, for batch jobs there is probably(?) never a reason for the
>> > >> scheduler to _reduce_ the parallelism; it can always try to run
>> things
>> > in
>> > >> sequence if it doesn't get enough slots.
>> > >> Reducing the parallelism would just mean that you'd have to wait for
>> > more
>> > >> producers to finish.
>> > >>
>> > >> The scope of this FLIP is just the protocol, without changes to the
>> > >> scheduler; in other words just changing how slots are acquired, but
>> > change
>> > >> nothing about the scheduling. That is tackled in a follow-up FLIP.
>> > >>
>> > >> On 28/08/2020 07:34, Zhu Zhu wrote:
>> > >>
>> > >> Thanks for the response!
>> > >>
>> > >> >> The scheduler doesn't have to wait for one stage to finish
>> > >> Does it mean we will declare resources and decide the parallelism
>> for a
>> > >> stage which is partially
>> > >> schedulable, i.e. when input data are ready just for part of the
>> > >> execution vertices?
>> > >>
>> > >> >> This will get more complicated once we allow the scheduler to
>> change
>> > >> the parallelism while the job is running
>> > >> Agreed. Looks to me it's a problem for batch jobs only and can be
>> > avoided
>> > >> for streaming jobs.
>> > >> Will this FLIP limit its scope to streaming jobs, and improvements
>> for
>> > >> batch jobs are to be done later?
>> > >>
>> > >> Thanks,
>> > >> Zhu
>> > >>
>> > >> Chesnay Schepler <ch...@apache.org> 于2020年8月28日周五 上午2:27写道:
>> > >>
>> > >>> The scheduler doesn't have to wait for one stage to finish. It is
>> still
>> > >>> aware that the upstream execution vertex has finished, and can
>> > request/use
>> > >>> slots accordingly to schedule the consumer.
>> > >>>
>> > >>> This will get more complicated once we allow the scheduler to change
>> > the
>> > >>> parallelism while the job is running, for which we will need some
>> > >>> enhancements to the network stack to allow the producer to run
>> without
>> > >>> knowing the consumer parallelism ahead of time. I'm not too clear on
>> > the
>> > >>> details, but we'll some form of keygroup-like approach for sub
>> > partitions
>> > >>> (maxParallelism and all that).
>> > >>>
>> > >>> On 27/08/2020 20:05, Zhu Zhu wrote:
>> > >>>
>> > >>> Thanks Chesnay&Till for proposing this improvement.
>> > >>> It's of good value to allow jobs to make best use of available
>> > resources
>> > >>> adaptively. Not
>> > >>> to mention it further supports reactive mode.
>> > >>> So big +1 for it.
>> > >>>
>> > >>> I have a minor concern about possible regression in certain cases
>> due
>> > to
>> > >>> the proposed
>> > >>> JobVertex-wise scheduling which replaces current
>> ExecutionVertex-wise
>> > >>> scheduling.
>> > >>> In the proposal, looks to me it requires a stage to finish before
>> its
>> > >>> consumer stage can be
>> > >>> scheduled. This limitation, however, does not exist in current
>> > >>> scheduler. In the case that there
>> > >>> exists a POINTWISE BLOCKING edge, the downstream execution region
>> can
>> > be
>> > >>> scheduled
>> > >>> right after its connected upstream execution vertices finishes, even
>> > >>> before the whole upstream
>> > >>> stage finishes. This allows the region to be launched earlier and
>> make
>> > >>> use of available resources.
>> > >>> Do we need to let the new scheduler retain this property?
>> > >>>
>> > >>> Thanks,
>> > >>> Zhu
>> > >>>
>> > >>> Xintong Song <to...@gmail.com> 于2020年8月26日周三 下午6:59写道:
>> > >>>
>> > >>>> Thanks for the quick response.
>> > >>>>
>> > >>>> *Job prioritization, Allocation IDs, Minimum resource
>> > >>>> requirements, SlotManager Implementation Plan:* Sounds good to me.
>> > >>>>
>> > >>>> *FLIP-56*
>> > >>>> Good point about the trade-off. I believe maximum resource
>> utilization
>> > >>>> and
>> > >>>> quick deployment are desired in different scenarios. E.g., a long
>> > >>>> running
>> > >>>> streaming job deserves some deployment latency to improve the
>> resource
>> > >>>> utilization, which benefits the entire lifecycle of the job. On the
>> > >>>> other
>> > >>>> hand, short batch queries may prefer quick deployment, otherwise
>> the
>> > >>>> time
>> > >>>> for resource allocation might significantly increase the response
>> > time.
>> > >>>> It would be good enough for me to bring these questions to
>> attention.
>> > >>>> Nothing that I'm aware of should block this FLIP.
>> > >>>>
>> > >>>> Thank you~
>> > >>>>
>> > >>>> Xintong Song
>> > >>>>
>> > >>>>
>> > >>>>
>> > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <
>> chesnay@apache.org>
>> > >>>> wrote:
>> > >>>>
>> > >>>> > Thank you Xintong for your questions!
>> > >>>> > Job prioritization
>> > >>>> > Yes, the job which declares it's initial requirements first is
>> > >>>> prioritized.
>> > >>>> > This is very much for simplicity; for example this avoids the
>> nasty
>> > >>>> case
>> > >>>> > where all jobs get some resources, but none get enough to
>> actually
>> > >>>> run the
>> > >>>> > job.
>> > >>>> > Minimum resource requirements
>> > >>>> >
>> > >>>> > My bad; at some point we want to allow the JobMaster to declare a
>> > >>>> range of
>> > >>>> > resources it could use to run a job, for example min=1,
>> target=10,
>> > >>>> > max=+inf.
>> > >>>> >
>> > >>>> > With this model, the RM would then try to balance the resources
>> such
>> > >>>> that
>> > >>>> > as many jobs as possible are as close to the target state as
>> > possible.
>> > >>>> >
>> > >>>> > Currently, the minimum/target/maximum resources are all the
>> same. So
>> > >>>> the
>> > >>>> > notification is sent whenever the current requirements cannot be
>> > met.
>> > >>>> > Allocation IDs
>> > >>>> > We do intend to, at the very least, remove AllocationIDs on the
>> > >>>> > SlotManager side, as they are just not required there.
>> > >>>> >
>> > >>>> > On the slotpool side we have to keep them around at least until
>> the
>> > >>>> > existing Slotpool implementations are removed (not sure whether
>> > we'll
>> > >>>> fully
>> > >>>> > commit to this in 1.12), since the interfaces use AllocationIDs,
>> > >>>> which also
>> > >>>> > bleed into the JobMaster.
>> > >>>> > The TaskExecutor is in a similar position.
>> > >>>> > But in the long-term, yes they will be removed, and most usages
>> will
>> > >>>> > probably be replaced by the SlotID.
>> > >>>> > FLIP-56
>> > >>>> >
>> > >>>> > Dynamic slot allocations are indeed quite interesting and raise a
>> > few
>> > >>>> > questions; for example, the main purpose of it is to ensure
>> maximum
>> > >>>> > resource utilization. In that case, should the JobMaster be
>> allowed
>> > to
>> > >>>> > re-use a slot it if the task requires less resources than the
>> slot
>> > >>>> > provides, or should it always request a new slot that exactly
>> > matches?
>> > >>>> >
>> > >>>> > There is a trade-off to be made between maximum resource
>> utilization
>> > >>>> > (request exactly matching slots, and only re-use exact matches)
>> and
>> > >>>> quicker
>> > >>>> > job deployment (re-use slot even if they don't exactly match,
>> skip
>> > >>>> > round-trip to RM).
>> > >>>> >
>> > >>>> > As for how to handle the lack of a preemptively known SlotIDs,
>> that
>> > >>>> should
>> > >>>> > be fine in and of itself; we already handle a similar case when
>> we
>> > >>>> request
>> > >>>> > a new TaskExecutor to be started. So long as there is some way to
>> > >>>> know how
>> > >>>> > many resources the TaskExecutor has in total I do not see a
>> problem
>> > >>>> at the
>> > >>>> > moment. We will get the SlotID eventually by virtue of the
>> heartbeat
>> > >>>> > SlotReport.
>> > >>>> > Implementation plan (SlotManager)
>> > >>>> > You are on the right track. The SlotManager tracks the declared
>> > >>>> resource
>> > >>>> > requirements, and if the requirements increased it creates a
>> > >>>> SlotRequest,
>> > >>>> > which then goes through similar code paths as we have at the
>> moment
>> > >>>> (try to
>> > >>>> > find a free slot, if found tell the TM, otherwise try to request
>> new
>> > >>>> TM).
>> > >>>> > The SlotManager changes are not that substantial to get a working
>> > >>>> version;
>> > >>>> > we have a PoC and most of the work went into refactoring the
>> > >>>> SlotManager
>> > >>>> > into a more manageable state. (split into several components,
>> > >>>> stricter and
>> > >>>> > simplified Slot life-cycle, ...).
>> > >>>> > Offer/free slots between JM/TM
>> > >>>> > Gotta run, but that's a good question and I'll think about. But I
>> > >>>> think it
>> > >>>> > comes down to making less changes, and being able to leverage
>> > existing
>> > >>>> > reconciliation protocols.
>> > >>>> > Do note that TaskExecutor also explicitly inform the RM about
>> freed
>> > >>>> slots;
>> > >>>> > the heartbeat slot report is just a safety net.
>> > >>>> > I'm not sure whether slot requests are able to overtake a slot
>> > >>>> release;
>> > >>>> > @till do you have thoughts on that?
>> > >>>> > As for the race condition between the requirements reduction and
>> > slot
>> > >>>> > release, if we run into problems we have the backup plan of only
>> > >>>> releasing
>> > >>>> > the slot after the requirement reduction has been acknowledged.
>> > >>>> >
>> > >>>> > On 26/08/2020 10:31, Xintong Song wrote:
>> > >>>> >
>> > >>>> > Thanks for preparing the FLIP and driving this discussion,
>> @Chesnay
>> > &
>> > >>>> @Till.
>> > >>>> >
>> > >>>> > I really like the idea. I see a great value in the proposed
>> > >>>> declarative
>> > >>>> > resource management, in terms of flexibility, usability and
>> > >>>> efficiency.
>> > >>>> >
>> > >>>> > I have a few comments and questions regarding the FLIP design. In
>> > >>>> general,
>> > >>>> > the protocol design makes good sense to me. My main concern is
>> that
>> > >>>> it is
>> > >>>> > not very clear to me what changes are required from the
>> > >>>> > Resource/SlotManager side to adapt to the new protocol.
>> > >>>> >
>> > >>>> > *1. Distributed slots across different jobs*
>> > >>>> >
>> > >>>> > Jobs which register their requirements first, will have
>> precedence
>> > >>>> over
>> > >>>> >
>> > >>>> > other jobs also if the requirements change during the runtime.
>> > >>>> >
>> > >>>> > Just trying to understand, does this mean jobs are prioritized by
>> > the
>> > >>>> order
>> > >>>> > of their first resource declaring?
>> > >>>> >
>> > >>>> > *2. AllocationID*
>> > >>>> >
>> > >>>> > Is this FLIP suggesting to completely remove AllocationID?
>> > >>>> >
>> > >>>> > I'm fine with this change. It seems where AllocationID is used
>> can
>> > >>>> either
>> > >>>> > be removed or be replaced by JobID. This reflects the concept
>> that
>> > >>>> slots
>> > >>>> > are now assigned to a job instead of its individual slot
>> requests.
>> > >>>> >
>> > >>>> > I would like to bring to attention that this also requires
>> changes
>> > on
>> > >>>> the
>> > >>>> > TM side, with respect to FLIP-56[1].
>> > >>>> >
>> > >>>> > In the context of dynamic slot allocation introduced by FLIP-56,
>> > >>>> slots do
>> > >>>> > not pre-exist on TM and are dynamically created when RM calls
>> > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not
>> pre-exist,
>> > nor
>> > >>>> > their SlotIDs, RM requests slots from TM with a special SlotID
>> > >>>> (negative
>> > >>>> > slot index). The semantic changes from "requesting the slot
>> > >>>> identified by
>> > >>>> > the given SlotID" to "requesting a slot with the given resource
>> > >>>> profile".
>> > >>>> > The AllocationID is used for identifying the dynamic slots in
>> such
>> > >>>> cases.
>> > >>>> >
>> > >>>> > >From the perspective of FLIP-56 and fine grained resource
>> > >>>> management, I'm
>> > >>>> > fine with removing AllocationID. In the meantime, we would need
>> TM
>> > to
>> > >>>> > recognize the special negative indexed SlotID and generate a new
>> > >>>> unique
>> > >>>> > SlotID for identifying the slot.
>> > >>>> >
>> > >>>> > *3. Minimum resource requirement*
>> > >>>> >
>> > >>>> > However, we can let the JobMaster know if we cannot fulfill the
>> > >>>> minimum
>> > >>>> >
>> > >>>> > resource requirement for a job after
>> > >>>> > resourcemanager.standalone.start-up-time has passed.
>> > >>>> >
>> > >>>> > What is the "minimum resource requirement for a job"? Did I
>> overlook
>> > >>>> > anything?
>> > >>>> >
>> > >>>> > *4. Offer/free slots between JM/TM*
>> > >>>> >
>> > >>>> > This probably deserves a separate discussion thread. Just want to
>> > >>>> bring it
>> > >>>> > up.
>> > >>>> >
>> > >>>> > The idea has been coming to me for quite some time. Is this
>> design,
>> > >>>> that JM
>> > >>>> > requests resources from RM while accepting/releasing resources
>> > >>>> from/to TM,
>> > >>>> > the right thing?
>> > >>>> >
>> > >>>> > The pain point is that events of JM's activities
>> > (requesting/releasing
>> > >>>> > resources) arrive at RM out of order. This leads to several
>> > problems.
>> > >>>> >
>> > >>>> >    - When a job fails and task cancelation takes long, some of
>> the
>> > >>>> slots
>> > >>>> >    might be released from the slot pool due to being unused for a
>> > >>>> while. Then
>> > >>>> >    the job restarts and requests these slots again. At this
>> time, RM
>> > >>>> may
>> > >>>> >    receive slot requests before noticing from TM heartbeats that
>> > >>>> previous
>> > >>>> >    slots are released, thus requesting new resources. I've seen
>> many
>> > >>>> times
>> > >>>> >    that the Yarn cluster has a heavy load and is not allocating
>> > >>>> resources
>> > >>>> >    quickly enough, which leads to slot request timeout and job
>> > >>>> failover, and
>> > >>>> >    during the failover more resources are requested which adds
>> more
>> > >>>> load to
>> > >>>> >    the Yarn cluster. Happily, this should be improved with the
>> > >>>> declarative
>> > >>>> >    resource management. :)
>> > >>>> >    - As described in this FLIP, it is possible that RM learns the
>> > >>>> releasing
>> > >>>> >    of slots from TM heartbeat before noticing the resource
>> > requirement
>> > >>>> >    decreasing, it may allocate more resources which need to be
>> > >>>> released soon.
>> > >>>> >    - It complicates the ResourceManager/SlotManager, by
>> requiring an
>> > >>>> >    additional slot state PENDING, which means the slot is
>> assigned
>> > by
>> > >>>> RM but
>> > >>>> >    is not confirmed successfully ordered by TM.
>> > >>>> >
>> > >>>> > Why not just make RM offer the allocated resources (TM address,
>> > >>>> SlotID,
>> > >>>> > etc.) to JM, and JM release resources to RM? So that for all the
>> > >>>> resource
>> > >>>> > management JM talks to RM, and for the task deployment and
>> execution
>> > >>>> it
>> > >>>> > talks to TM?
>> > >>>> >
>> > >>>> > I tried to understand the benefits for having the current design,
>> > and
>> > >>>> found
>> > >>>> > the following in FLIP-6[2].
>> > >>>> >
>> > >>>> >
>> > >>>> > All that the ResourceManager does is negotiate between the
>> > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its state
>> can
>> > >>>> hence
>> > >>>> > be reconstructed from re-acquiring containers and re-registration
>> > from
>> > >>>> > JobManagers and TaskManagers
>> > >>>> >
>> > >>>> > Correct me if I'm wrong, it seems the original purpose is to make
>> > >>>> sure the
>> > >>>> > assignment between jobs and slots are confirmed between JM and
>> TMs,
>> > >>>> so that
>> > >>>> > failures of RM will not lead to any inconsistency. However, this
>> > only
>> > >>>> > benefits scenarios where RM fails while JM and TMs live.
>> Currently,
>> > >>>> JM and
>> > >>>> > RM are in the same process. We do not really have any scenario
>> where
>> > >>>> RM
>> > >>>> > fails alone. We might separate JM and RM to different processes
>> in
>> > >>>> future,
>> > >>>> > but as far as I can see we don't have such requirements at the
>> > >>>> moment. It
>> > >>>> > seems to me that we are suffering the current problems,
>> complying to
>> > >>>> > potential future benefits.
>> > >>>> >
>> > >>>> > Maybe I overlooked something.
>> > >>>> >
>> > >>>> > *5. Implementation Plan*
>> > >>>> >
>> > >>>> > For SlotPool, it sounds quite straightforward to "aggregate
>> > >>>> individual slot
>> > >>>> > requests".
>> > >>>> >
>> > >>>> > For Resource/SlotManager, it seems there are quite a lot changes
>> > >>>> needed,
>> > >>>> > with the removal of individual slot requests and AllocationID.
>> It's
>> > >>>> not
>> > >>>> > clear to me what is the first step plan for RM/SM? Do we
>> internally
>> > >>>> treat
>> > >>>> > the resource requirements as individual slot requests as the
>> first
>> > >>>> step, so
>> > >>>> > only the interfaces are changed? Or do we actually change
>> > (practically
>> > >>>> > re-write) the slot allocation logics?
>> > >>>> >
>> > >>>> > Thank you~
>> > >>>> >
>> > >>>> > Xintong Song
>> > >>>> >
>> > >>>> >
>> > >>>> > [1]
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
>> > >>>> > [2]
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
>> > >>>> >
>> > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <
>> > chesnay@apache.org>
>> > >>>> <ch...@apache.org> wrote:
>> > >>>> >
>> > >>>> >
>> > >>>> > Hello,
>> > >>>> >
>> > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires
>> slots,
>> > >>>> such
>> > >>>> > that required resources are declared before a job is scheduled
>> and
>> > th
>> > >>>> > job execution is adjusted according to the provided resources
>> (e.g.,
>> > >>>> > reducing parallelism), instead of asking for a fixed number of
>> > >>>> resources
>> > >>>> > during scheduling and failing midway through if not enough
>> resources
>> > >>>> are
>> > >>>> > available.
>> > >>>> >
>> > >>>> > This is a stepping stone towards reactive mode, where Flink will
>> > >>>> > automatically make use of new TaskExecutors being started.
>> > >>>> >
>> > >>>> > More details can be found here
>> > >>>> > <
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
>> > >>>> >
>> > >>>> > .
>> > >>>> >
>> > >>>> >
>> > >>>> >
>> > >>>>
>> > >>>
>> > >>>
>> > >>
>> >
>>
>

Re: [DISCUSS] FLIP-138: Declarative Resource management

Posted by Zhu Zhu <re...@gmail.com>.
The new edits look good to me.
Looking forward to the vote.

Thanks,
Zhu

Xintong Song <to...@gmail.com> 于2020年9月4日周五 上午9:49写道:

> Thanks Till, the changes look good to me. Looking forward to the vote.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Sep 4, 2020 at 12:31 AM Till Rohrmann <tr...@apache.org>
> wrote:
>
> > Thanks for the feedback Xintong and Zhu Zhu. I've added a bit more
> details
> > for the intended interface extensions, potential follow ups (removing the
> > AllocationIDs) and the question about whether to reuse or return a slot
> if
> > the profiles don't fully match.
> >
> > If nobody objects, then I would start a vote for this FLIP soon.
> >
> > Cheers,
> > Till
> >
> > On Mon, Aug 31, 2020 at 11:53 AM Zhu Zhu <re...@gmail.com> wrote:
> >
> > > Thanks for the clarification @Till Rohrmann <tr...@apache.org>
> > >
> > > >> # Implications for the scheduling
> > > Agreed that it turned out to be different execution strategies for
> batch
> > > jobs.
> > > We can have a simple one first and improve it later.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Xintong Song <to...@gmail.com> 于2020年8月31日周一 下午3:05写道:
> > >
> > >> Thanks for the clarification, @Till.
> > >>
> > >> - For FLIP-56, sounds good to me. I think there should be no problem
> > >> before
> > >> removing AllocationID. And even after replacing AllocationID, it
> should
> > >> only require limited effort to make FLIP-56 work with SlotID. I was
> just
> > >> trying to understand when the effort will be needed.
> > >>
> > >> - For offer/release slots between JM/TM, I think you are right.
> > >> Waiting on the confirmation for resource requirement decrease before
> > >> freeing the slot is quite equivalent to releasing slots through RM, in
> > >> terms of it practically preventing JM from releasing slots when the RM
> > is
> > >> absent. But this approach obviously requires less change to the
> current
> > >> mechanism.
> > >> Since the first problem can be solved by the declarative protocol, and
> > the
> > >> second problem can be addressed by this confirmation based approach,
> > ATM I
> > >> don't see any strong reason for changing to offering and releasing
> slots
> > >> through RM, especially considering the significant changes it
> requires.
> > >>
> > >> Thank you~
> > >>
> > >> Xintong Song
> > >>
> > >>
> > >>
> > >> On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <tr...@apache.org>
> > >> wrote:
> > >>
> > >> > Thanks for creating this FLIP @Chesnay and the good input @Xintong
> and
> > >> @Zhu
> > >> > Zhu.
> > >> >
> > >> > Let me try to add some comments concerning your questions:
> > >> >
> > >> > # FLIP-56
> > >> >
> > >> > I think there is nothing fundamentally contradicting FLIP-56 in the
> > FLIP
> > >> > for declarative resource management. As Chesnay said, we have to
> keep
> > >> the
> > >> > AllocationID around as long as we have the old scheduler
> > implementation.
> > >> > Once it is replaced, we can think about using the SlotID instead of
> > >> > AllocationIDs for identifying allocated slots. For dynamic slots we
> > can
> > >> > keep the special meaning of a SlotID with a negative index. In the
> > >> future
> > >> > we might think about making this encoding a bit more explicit by
> > >> sending a
> > >> > richer slot request object and reporting the actual SlotID back to
> the
> > >> RM.
> > >> >
> > >> > For the question of resource utilization vs. deployment latency I
> > >> believe
> > >> > that this will be a question of requirements and preferences as
> you've
> > >> said
> > >> > Xintong. I can see that we will have different strategies to fulfill
> > the
> > >> > different needs.
> > >> >
> > >> > # Offer/free slots between JM/TM
> > >> >
> > >> > You are right Xintong that the existing slot protocol was developed
> > with
> > >> > the assumption in mind that the RM and JM can run in separate
> > processes
> > >> and
> > >> > that a failure of the RM should only affect the JM in the sense that
> > it
> > >> > cannot ask for more resources. I believe that one could simplify
> > things
> > >> a
> > >> > bit under the assumption that the RM and JM are always colocated in
> > the
> > >> > same process. However, the discussion whether to change it or not
> > should
> > >> > indeed be a separate one.
> > >> >
> > >> > Changing the slot protocol to a declarative resource management
> should
> > >> > already solve the first problem you have described because we won't
> > ask
> > >> for
> > >> > new slots in case of a failover but simply keep the same resource
> > >> > requirements declared and let the RM make sure that we will receive
> at
> > >> > least this amount of slots.
> > >> >
> > >> > If releasing a slot should lead to allocating new resources because
> > >> > decreasing the resource requirement declaration takes longer than
> > >> releasing
> > >> > the slot on the TM, then we could apply what Chesnay said. By
> waiting
> > on
> > >> > the confirmation of the resource requirement decrease and then
> freeing
> > >> the
> > >> > slot on the TM gives you effectively the same behaviour as if the
> > >> freeing
> > >> > of the slot would be done by the RM.
> > >> >
> > >> > I am not entirely sure whether allocating the slots and receiving
> the
> > >> slot
> > >> > offers through the RM will allow us to get rid of the pending slot
> > >> state on
> > >> > the RM side. If the RM needs to communicate with the TM and we want
> to
> > >> have
> > >> > a reconciliation protocol between these components, then I think we
> > >> would
> > >> > have to solve the exact same problem of currently waiting on the TM
> > for
> > >> > confirming that a slot has been allocated.
> > >> >
> > >> > # Implications for the scheduling
> > >> >
> > >> > The FLIP does not fully cover the changes for the scheduler and
> mainly
> > >> > drafts the rough idea. For the batch scheduling, I believe that we
> > have
> > >> a
> > >> > couple degrees of freedom in how to do things. In the scenario you
> > >> > described, one could choose a simple strategy where we wait for all
> > >> > producers to stop before deciding on the parallelism of the consumer
> > and
> > >> > scheduling the respective tasks (even though they have POINTWISE
> > >> BLOCKING
> > >> > edges). Or we can try to be smart and say if we get at least one
> slot
> > >> that
> > >> > we can run the consumers with the same parallelism as the producers
> it
> > >> just
> > >> > might be that we have to run them one after another in a single
> slot.
> > >> One
> > >> > advantage of not directly schedule the first consumer when the first
> > >> > producer is finished is that one might schedule the consumer stage
> > with
> > >> a
> > >> > higher parallelism because one might acquire more resources a bit
> > later.
> > >> > But I would see this as different execution strategies which have
> > >> different
> > >> > properties.
> > >> >
> > >> > Cheers,
> > >> > Till
> > >> >
> > >> > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <re...@gmail.com> wrote:
> > >> >
> > >> > > Thanks for the explanation @Chesnay Schepler <ch...@apache.org>
> .
> > >> > >
> > >> > > Yes, for batch jobs it can be safe to schedule downstream vertices
> > if
> > >> > > there
> > >> > > are enough slots in the pool, even if these slots are still in use
> > at
> > >> > that
> > >> > > moment.
> > >> > > And the job can still progress even if the vertices stick to the
> > >> original
> > >> > > parallelism.
> > >> > >
> > >> > > Looks to me several decision makings can be different for
> streaming
> > >> and
> > >> > > batch jobs.
> > >> > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph
> > >> > > construction!
> > >> > >
> > >> > > Thanks,
> > >> > > Zhu
> > >> > >
> > >> > > Chesnay Schepler <ch...@apache.org> 于2020年8月28日周五 下午4:35写道:
> > >> > >
> > >> > >> Maybe :)
> > >> > >>
> > >> > >> Imagine a case where the producer and consumer have the same
> > >> > >> ResourceProfile, or at least one where the consumer requirements
> > are
> > >> > less
> > >> > >> than the producer ones.
> > >> > >> In this case, the scheduler can happily schedule consumers,
> because
> > >> it
> > >> > >> knows it will get enough slots.
> > >> > >>
> > >> > >> If the profiles are different, then the Scheduler _may_ wait
> > >> > >> numberOf(producer) slots; it _may_ also stick with the
> parallelism
> > >> and
> > >> > >> schedule right away, in the worst case running the consumers in
> > >> > sequence.
> > >> > >> In fact, for batch jobs there is probably(?) never a reason for
> the
> > >> > >> scheduler to _reduce_ the parallelism; it can always try to run
> > >> things
> > >> > in
> > >> > >> sequence if it doesn't get enough slots.
> > >> > >> Reducing the parallelism would just mean that you'd have to wait
> > for
> > >> > more
> > >> > >> producers to finish.
> > >> > >>
> > >> > >> The scope of this FLIP is just the protocol, without changes to
> the
> > >> > >> scheduler; in other words just changing how slots are acquired,
> but
> > >> > change
> > >> > >> nothing about the scheduling. That is tackled in a follow-up
> FLIP.
> > >> > >>
> > >> > >> On 28/08/2020 07:34, Zhu Zhu wrote:
> > >> > >>
> > >> > >> Thanks for the response!
> > >> > >>
> > >> > >> >> The scheduler doesn't have to wait for one stage to finish
> > >> > >> Does it mean we will declare resources and decide the parallelism
> > >> for a
> > >> > >> stage which is partially
> > >> > >> schedulable, i.e. when input data are ready just for part of the
> > >> > >> execution vertices?
> > >> > >>
> > >> > >> >> This will get more complicated once we allow the scheduler to
> > >> change
> > >> > >> the parallelism while the job is running
> > >> > >> Agreed. Looks to me it's a problem for batch jobs only and can be
> > >> > avoided
> > >> > >> for streaming jobs.
> > >> > >> Will this FLIP limit its scope to streaming jobs, and
> improvements
> > >> for
> > >> > >> batch jobs are to be done later?
> > >> > >>
> > >> > >> Thanks,
> > >> > >> Zhu
> > >> > >>
> > >> > >> Chesnay Schepler <ch...@apache.org> 于2020年8月28日周五 上午2:27写道:
> > >> > >>
> > >> > >>> The scheduler doesn't have to wait for one stage to finish. It
> is
> > >> still
> > >> > >>> aware that the upstream execution vertex has finished, and can
> > >> > request/use
> > >> > >>> slots accordingly to schedule the consumer.
> > >> > >>>
> > >> > >>> This will get more complicated once we allow the scheduler to
> > change
> > >> > the
> > >> > >>> parallelism while the job is running, for which we will need
> some
> > >> > >>> enhancements to the network stack to allow the producer to run
> > >> without
> > >> > >>> knowing the consumer parallelism ahead of time. I'm not too
> clear
> > on
> > >> > the
> > >> > >>> details, but we'll some form of keygroup-like approach for sub
> > >> > partitions
> > >> > >>> (maxParallelism and all that).
> > >> > >>>
> > >> > >>> On 27/08/2020 20:05, Zhu Zhu wrote:
> > >> > >>>
> > >> > >>> Thanks Chesnay&Till for proposing this improvement.
> > >> > >>> It's of good value to allow jobs to make best use of available
> > >> > resources
> > >> > >>> adaptively. Not
> > >> > >>> to mention it further supports reactive mode.
> > >> > >>> So big +1 for it.
> > >> > >>>
> > >> > >>> I have a minor concern about possible regression in certain
> cases
> > >> due
> > >> > to
> > >> > >>> the proposed
> > >> > >>> JobVertex-wise scheduling which replaces current
> > >> ExecutionVertex-wise
> > >> > >>> scheduling.
> > >> > >>> In the proposal, looks to me it requires a stage to finish
> before
> > >> its
> > >> > >>> consumer stage can be
> > >> > >>> scheduled. This limitation, however, does not exist in current
> > >> > >>> scheduler. In the case that there
> > >> > >>> exists a POINTWISE BLOCKING edge, the downstream execution
> region
> > >> can
> > >> > be
> > >> > >>> scheduled
> > >> > >>> right after its connected upstream execution vertices finishes,
> > even
> > >> > >>> before the whole upstream
> > >> > >>> stage finishes. This allows the region to be launched earlier
> and
> > >> make
> > >> > >>> use of available resources.
> > >> > >>> Do we need to let the new scheduler retain this property?
> > >> > >>>
> > >> > >>> Thanks,
> > >> > >>> Zhu
> > >> > >>>
> > >> > >>> Xintong Song <to...@gmail.com> 于2020年8月26日周三 下午6:59写道:
> > >> > >>>
> > >> > >>>> Thanks for the quick response.
> > >> > >>>>
> > >> > >>>> *Job prioritization, Allocation IDs, Minimum resource
> > >> > >>>> requirements, SlotManager Implementation Plan:* Sounds good to
> > me.
> > >> > >>>>
> > >> > >>>> *FLIP-56*
> > >> > >>>> Good point about the trade-off. I believe maximum resource
> > >> utilization
> > >> > >>>> and
> > >> > >>>> quick deployment are desired in different scenarios. E.g., a
> long
> > >> > >>>> running
> > >> > >>>> streaming job deserves some deployment latency to improve the
> > >> resource
> > >> > >>>> utilization, which benefits the entire lifecycle of the job. On
> > the
> > >> > >>>> other
> > >> > >>>> hand, short batch queries may prefer quick deployment,
> otherwise
> > >> the
> > >> > >>>> time
> > >> > >>>> for resource allocation might significantly increase the
> response
> > >> > time.
> > >> > >>>> It would be good enough for me to bring these questions to
> > >> attention.
> > >> > >>>> Nothing that I'm aware of should block this FLIP.
> > >> > >>>>
> > >> > >>>> Thank you~
> > >> > >>>>
> > >> > >>>> Xintong Song
> > >> > >>>>
> > >> > >>>>
> > >> > >>>>
> > >> > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <
> > >> chesnay@apache.org>
> > >> > >>>> wrote:
> > >> > >>>>
> > >> > >>>> > Thank you Xintong for your questions!
> > >> > >>>> > Job prioritization
> > >> > >>>> > Yes, the job which declares it's initial requirements first
> is
> > >> > >>>> prioritized.
> > >> > >>>> > This is very much for simplicity; for example this avoids the
> > >> nasty
> > >> > >>>> case
> > >> > >>>> > where all jobs get some resources, but none get enough to
> > >> actually
> > >> > >>>> run the
> > >> > >>>> > job.
> > >> > >>>> > Minimum resource requirements
> > >> > >>>> >
> > >> > >>>> > My bad; at some point we want to allow the JobMaster to
> > declare a
> > >> > >>>> range of
> > >> > >>>> > resources it could use to run a job, for example min=1,
> > >> target=10,
> > >> > >>>> > max=+inf.
> > >> > >>>> >
> > >> > >>>> > With this model, the RM would then try to balance the
> resources
> > >> such
> > >> > >>>> that
> > >> > >>>> > as many jobs as possible are as close to the target state as
> > >> > possible.
> > >> > >>>> >
> > >> > >>>> > Currently, the minimum/target/maximum resources are all the
> > >> same. So
> > >> > >>>> the
> > >> > >>>> > notification is sent whenever the current requirements cannot
> > be
> > >> > met.
> > >> > >>>> > Allocation IDs
> > >> > >>>> > We do intend to, at the very least, remove AllocationIDs on
> the
> > >> > >>>> > SlotManager side, as they are just not required there.
> > >> > >>>> >
> > >> > >>>> > On the slotpool side we have to keep them around at least
> until
> > >> the
> > >> > >>>> > existing Slotpool implementations are removed (not sure
> whether
> > >> > we'll
> > >> > >>>> fully
> > >> > >>>> > commit to this in 1.12), since the interfaces use
> > AllocationIDs,
> > >> > >>>> which also
> > >> > >>>> > bleed into the JobMaster.
> > >> > >>>> > The TaskExecutor is in a similar position.
> > >> > >>>> > But in the long-term, yes they will be removed, and most
> usages
> > >> will
> > >> > >>>> > probably be replaced by the SlotID.
> > >> > >>>> > FLIP-56
> > >> > >>>> >
> > >> > >>>> > Dynamic slot allocations are indeed quite interesting and
> > raise a
> > >> > few
> > >> > >>>> > questions; for example, the main purpose of it is to ensure
> > >> maximum
> > >> > >>>> > resource utilization. In that case, should the JobMaster be
> > >> allowed
> > >> > to
> > >> > >>>> > re-use a slot it if the task requires less resources than the
> > >> slot
> > >> > >>>> > provides, or should it always request a new slot that exactly
> > >> > matches?
> > >> > >>>> >
> > >> > >>>> > There is a trade-off to be made between maximum resource
> > >> utilization
> > >> > >>>> > (request exactly matching slots, and only re-use exact
> matches)
> > >> and
> > >> > >>>> quicker
> > >> > >>>> > job deployment (re-use slot even if they don't exactly match,
> > >> skip
> > >> > >>>> > round-trip to RM).
> > >> > >>>> >
> > >> > >>>> > As for how to handle the lack of a preemptively known
> SlotIDs,
> > >> that
> > >> > >>>> should
> > >> > >>>> > be fine in and of itself; we already handle a similar case
> when
> > >> we
> > >> > >>>> request
> > >> > >>>> > a new TaskExecutor to be started. So long as there is some
> way
> > to
> > >> > >>>> know how
> > >> > >>>> > many resources the TaskExecutor has in total I do not see a
> > >> problem
> > >> > >>>> at the
> > >> > >>>> > moment. We will get the SlotID eventually by virtue of the
> > >> heartbeat
> > >> > >>>> > SlotReport.
> > >> > >>>> > Implementation plan (SlotManager)
> > >> > >>>> > You are on the right track. The SlotManager tracks the
> declared
> > >> > >>>> resource
> > >> > >>>> > requirements, and if the requirements increased it creates a
> > >> > >>>> SlotRequest,
> > >> > >>>> > which then goes through similar code paths as we have at the
> > >> moment
> > >> > >>>> (try to
> > >> > >>>> > find a free slot, if found tell the TM, otherwise try to
> > request
> > >> new
> > >> > >>>> TM).
> > >> > >>>> > The SlotManager changes are not that substantial to get a
> > working
> > >> > >>>> version;
> > >> > >>>> > we have a PoC and most of the work went into refactoring the
> > >> > >>>> SlotManager
> > >> > >>>> > into a more manageable state. (split into several components,
> > >> > >>>> stricter and
> > >> > >>>> > simplified Slot life-cycle, ...).
> > >> > >>>> > Offer/free slots between JM/TM
> > >> > >>>> > Gotta run, but that's a good question and I'll think about.
> > But I
> > >> > >>>> think it
> > >> > >>>> > comes down to making less changes, and being able to leverage
> > >> > existing
> > >> > >>>> > reconciliation protocols.
> > >> > >>>> > Do note that TaskExecutor also explicitly inform the RM about
> > >> freed
> > >> > >>>> slots;
> > >> > >>>> > the heartbeat slot report is just a safety net.
> > >> > >>>> > I'm not sure whether slot requests are able to overtake a
> slot
> > >> > >>>> release;
> > >> > >>>> > @till do you have thoughts on that?
> > >> > >>>> > As for the race condition between the requirements reduction
> > and
> > >> > slot
> > >> > >>>> > release, if we run into problems we have the backup plan of
> > only
> > >> > >>>> releasing
> > >> > >>>> > the slot after the requirement reduction has been
> acknowledged.
> > >> > >>>> >
> > >> > >>>> > On 26/08/2020 10:31, Xintong Song wrote:
> > >> > >>>> >
> > >> > >>>> > Thanks for preparing the FLIP and driving this discussion,
> > >> @Chesnay
> > >> > &
> > >> > >>>> @Till.
> > >> > >>>> >
> > >> > >>>> > I really like the idea. I see a great value in the proposed
> > >> > >>>> declarative
> > >> > >>>> > resource management, in terms of flexibility, usability and
> > >> > >>>> efficiency.
> > >> > >>>> >
> > >> > >>>> > I have a few comments and questions regarding the FLIP
> design.
> > In
> > >> > >>>> general,
> > >> > >>>> > the protocol design makes good sense to me. My main concern
> is
> > >> that
> > >> > >>>> it is
> > >> > >>>> > not very clear to me what changes are required from the
> > >> > >>>> > Resource/SlotManager side to adapt to the new protocol.
> > >> > >>>> >
> > >> > >>>> > *1. Distributed slots across different jobs*
> > >> > >>>> >
> > >> > >>>> > Jobs which register their requirements first, will have
> > >> precedence
> > >> > >>>> over
> > >> > >>>> >
> > >> > >>>> > other jobs also if the requirements change during the
> runtime.
> > >> > >>>> >
> > >> > >>>> > Just trying to understand, does this mean jobs are
> prioritized
> > by
> > >> > the
> > >> > >>>> order
> > >> > >>>> > of their first resource declaring?
> > >> > >>>> >
> > >> > >>>> > *2. AllocationID*
> > >> > >>>> >
> > >> > >>>> > Is this FLIP suggesting to completely remove AllocationID?
> > >> > >>>> >
> > >> > >>>> > I'm fine with this change. It seems where AllocationID is
> used
> > >> can
> > >> > >>>> either
> > >> > >>>> > be removed or be replaced by JobID. This reflects the concept
> > >> that
> > >> > >>>> slots
> > >> > >>>> > are now assigned to a job instead of its individual slot
> > >> requests.
> > >> > >>>> >
> > >> > >>>> > I would like to bring to attention that this also requires
> > >> changes
> > >> > on
> > >> > >>>> the
> > >> > >>>> > TM side, with respect to FLIP-56[1].
> > >> > >>>> >
> > >> > >>>> > In the context of dynamic slot allocation introduced by
> > FLIP-56,
> > >> > >>>> slots do
> > >> > >>>> > not pre-exist on TM and are dynamically created when RM calls
> > >> > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not
> > >> pre-exist,
> > >> > nor
> > >> > >>>> > their SlotIDs, RM requests slots from TM with a special
> SlotID
> > >> > >>>> (negative
> > >> > >>>> > slot index). The semantic changes from "requesting the slot
> > >> > >>>> identified by
> > >> > >>>> > the given SlotID" to "requesting a slot with the given
> resource
> > >> > >>>> profile".
> > >> > >>>> > The AllocationID is used for identifying the dynamic slots in
> > >> such
> > >> > >>>> cases.
> > >> > >>>> >
> > >> > >>>> > >From the perspective of FLIP-56 and fine grained resource
> > >> > >>>> management, I'm
> > >> > >>>> > fine with removing AllocationID. In the meantime, we would
> need
> > >> TM
> > >> > to
> > >> > >>>> > recognize the special negative indexed SlotID and generate a
> > new
> > >> > >>>> unique
> > >> > >>>> > SlotID for identifying the slot.
> > >> > >>>> >
> > >> > >>>> > *3. Minimum resource requirement*
> > >> > >>>> >
> > >> > >>>> > However, we can let the JobMaster know if we cannot fulfill
> the
> > >> > >>>> minimum
> > >> > >>>> >
> > >> > >>>> > resource requirement for a job after
> > >> > >>>> > resourcemanager.standalone.start-up-time has passed.
> > >> > >>>> >
> > >> > >>>> > What is the "minimum resource requirement for a job"? Did I
> > >> overlook
> > >> > >>>> > anything?
> > >> > >>>> >
> > >> > >>>> > *4. Offer/free slots between JM/TM*
> > >> > >>>> >
> > >> > >>>> > This probably deserves a separate discussion thread. Just
> want
> > to
> > >> > >>>> bring it
> > >> > >>>> > up.
> > >> > >>>> >
> > >> > >>>> > The idea has been coming to me for quite some time. Is this
> > >> design,
> > >> > >>>> that JM
> > >> > >>>> > requests resources from RM while accepting/releasing
> resources
> > >> > >>>> from/to TM,
> > >> > >>>> > the right thing?
> > >> > >>>> >
> > >> > >>>> > The pain point is that events of JM's activities
> > >> > (requesting/releasing
> > >> > >>>> > resources) arrive at RM out of order. This leads to several
> > >> > problems.
> > >> > >>>> >
> > >> > >>>> >    - When a job fails and task cancelation takes long, some
> of
> > >> the
> > >> > >>>> slots
> > >> > >>>> >    might be released from the slot pool due to being unused
> > for a
> > >> > >>>> while. Then
> > >> > >>>> >    the job restarts and requests these slots again. At this
> > >> time, RM
> > >> > >>>> may
> > >> > >>>> >    receive slot requests before noticing from TM heartbeats
> > that
> > >> > >>>> previous
> > >> > >>>> >    slots are released, thus requesting new resources. I've
> seen
> > >> many
> > >> > >>>> times
> > >> > >>>> >    that the Yarn cluster has a heavy load and is not
> allocating
> > >> > >>>> resources
> > >> > >>>> >    quickly enough, which leads to slot request timeout and
> job
> > >> > >>>> failover, and
> > >> > >>>> >    during the failover more resources are requested which
> adds
> > >> more
> > >> > >>>> load to
> > >> > >>>> >    the Yarn cluster. Happily, this should be improved with
> the
> > >> > >>>> declarative
> > >> > >>>> >    resource management. :)
> > >> > >>>> >    - As described in this FLIP, it is possible that RM learns
> > the
> > >> > >>>> releasing
> > >> > >>>> >    of slots from TM heartbeat before noticing the resource
> > >> > requirement
> > >> > >>>> >    decreasing, it may allocate more resources which need to
> be
> > >> > >>>> released soon.
> > >> > >>>> >    - It complicates the ResourceManager/SlotManager, by
> > >> requiring an
> > >> > >>>> >    additional slot state PENDING, which means the slot is
> > >> assigned
> > >> > by
> > >> > >>>> RM but
> > >> > >>>> >    is not confirmed successfully ordered by TM.
> > >> > >>>> >
> > >> > >>>> > Why not just make RM offer the allocated resources (TM
> address,
> > >> > >>>> SlotID,
> > >> > >>>> > etc.) to JM, and JM release resources to RM? So that for all
> > the
> > >> > >>>> resource
> > >> > >>>> > management JM talks to RM, and for the task deployment and
> > >> execution
> > >> > >>>> it
> > >> > >>>> > talks to TM?
> > >> > >>>> >
> > >> > >>>> > I tried to understand the benefits for having the current
> > design,
> > >> > and
> > >> > >>>> found
> > >> > >>>> > the following in FLIP-6[2].
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> > All that the ResourceManager does is negotiate between the
> > >> > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its
> > state
> > >> can
> > >> > >>>> hence
> > >> > >>>> > be reconstructed from re-acquiring containers and
> > re-registration
> > >> > from
> > >> > >>>> > JobManagers and TaskManagers
> > >> > >>>> >
> > >> > >>>> > Correct me if I'm wrong, it seems the original purpose is to
> > make
> > >> > >>>> sure the
> > >> > >>>> > assignment between jobs and slots are confirmed between JM
> and
> > >> TMs,
> > >> > >>>> so that
> > >> > >>>> > failures of RM will not lead to any inconsistency. However,
> > this
> > >> > only
> > >> > >>>> > benefits scenarios where RM fails while JM and TMs live.
> > >> Currently,
> > >> > >>>> JM and
> > >> > >>>> > RM are in the same process. We do not really have any
> scenario
> > >> where
> > >> > >>>> RM
> > >> > >>>> > fails alone. We might separate JM and RM to different
> processes
> > >> in
> > >> > >>>> future,
> > >> > >>>> > but as far as I can see we don't have such requirements at
> the
> > >> > >>>> moment. It
> > >> > >>>> > seems to me that we are suffering the current problems,
> > >> complying to
> > >> > >>>> > potential future benefits.
> > >> > >>>> >
> > >> > >>>> > Maybe I overlooked something.
> > >> > >>>> >
> > >> > >>>> > *5. Implementation Plan*
> > >> > >>>> >
> > >> > >>>> > For SlotPool, it sounds quite straightforward to "aggregate
> > >> > >>>> individual slot
> > >> > >>>> > requests".
> > >> > >>>> >
> > >> > >>>> > For Resource/SlotManager, it seems there are quite a lot
> > changes
> > >> > >>>> needed,
> > >> > >>>> > with the removal of individual slot requests and
> AllocationID.
> > >> It's
> > >> > >>>> not
> > >> > >>>> > clear to me what is the first step plan for RM/SM? Do we
> > >> internally
> > >> > >>>> treat
> > >> > >>>> > the resource requirements as individual slot requests as the
> > >> first
> > >> > >>>> step, so
> > >> > >>>> > only the interfaces are changed? Or do we actually change
> > >> > (practically
> > >> > >>>> > re-write) the slot allocation logics?
> > >> > >>>> >
> > >> > >>>> > Thank you~
> > >> > >>>> >
> > >> > >>>> > Xintong Song
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> > [1]
> > >> > >>>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> > >> > >>>> > [2]
> > >> > >>>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> > >> > >>>> >
> > >> > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <
> > >> > chesnay@apache.org>
> > >> > >>>> <ch...@apache.org> wrote:
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> > Hello,
> > >> > >>>> >
> > >> > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires
> > >> slots,
> > >> > >>>> such
> > >> > >>>> > that required resources are declared before a job is
> scheduled
> > >> and
> > >> > th
> > >> > >>>> > job execution is adjusted according to the provided resources
> > >> (e.g.,
> > >> > >>>> > reducing parallelism), instead of asking for a fixed number
> of
> > >> > >>>> resources
> > >> > >>>> > during scheduling and failing midway through if not enough
> > >> resources
> > >> > >>>> are
> > >> > >>>> > available.
> > >> > >>>> >
> > >> > >>>> > This is a stepping stone towards reactive mode, where Flink
> > will
> > >> > >>>> > automatically make use of new TaskExecutors being started.
> > >> > >>>> >
> > >> > >>>> > More details can be found here
> > >> > >>>> > <
> > >> > >>>>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> > >> > >>>> >
> > >> > >>>> > .
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>> >
> > >> > >>>>
> > >> > >>>
> > >> > >>>
> > >> > >>
> > >> >
> > >>
> > >
> >
>

Re: [DISCUSS] FLIP-138: Declarative Resource management

Posted by Xintong Song <to...@gmail.com>.
Thanks Till, the changes look good to me. Looking forward to the vote.

Thank you~

Xintong Song



On Fri, Sep 4, 2020 at 12:31 AM Till Rohrmann <tr...@apache.org> wrote:

> Thanks for the feedback Xintong and Zhu Zhu. I've added a bit more details
> for the intended interface extensions, potential follow ups (removing the
> AllocationIDs) and the question about whether to reuse or return a slot if
> the profiles don't fully match.
>
> If nobody objects, then I would start a vote for this FLIP soon.
>
> Cheers,
> Till
>
> On Mon, Aug 31, 2020 at 11:53 AM Zhu Zhu <re...@gmail.com> wrote:
>
> > Thanks for the clarification @Till Rohrmann <tr...@apache.org>
> >
> > >> # Implications for the scheduling
> > Agreed that it turned out to be different execution strategies for batch
> > jobs.
> > We can have a simple one first and improve it later.
> >
> > Thanks,
> > Zhu
> >
> > Xintong Song <to...@gmail.com> 于2020年8月31日周一 下午3:05写道:
> >
> >> Thanks for the clarification, @Till.
> >>
> >> - For FLIP-56, sounds good to me. I think there should be no problem
> >> before
> >> removing AllocationID. And even after replacing AllocationID, it should
> >> only require limited effort to make FLIP-56 work with SlotID. I was just
> >> trying to understand when the effort will be needed.
> >>
> >> - For offer/release slots between JM/TM, I think you are right.
> >> Waiting on the confirmation for resource requirement decrease before
> >> freeing the slot is quite equivalent to releasing slots through RM, in
> >> terms of it practically preventing JM from releasing slots when the RM
> is
> >> absent. But this approach obviously requires less change to the current
> >> mechanism.
> >> Since the first problem can be solved by the declarative protocol, and
> the
> >> second problem can be addressed by this confirmation based approach,
> ATM I
> >> don't see any strong reason for changing to offering and releasing slots
> >> through RM, especially considering the significant changes it requires.
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Fri, Aug 28, 2020 at 10:07 PM Till Rohrmann <tr...@apache.org>
> >> wrote:
> >>
> >> > Thanks for creating this FLIP @Chesnay and the good input @Xintong and
> >> @Zhu
> >> > Zhu.
> >> >
> >> > Let me try to add some comments concerning your questions:
> >> >
> >> > # FLIP-56
> >> >
> >> > I think there is nothing fundamentally contradicting FLIP-56 in the
> FLIP
> >> > for declarative resource management. As Chesnay said, we have to keep
> >> the
> >> > AllocationID around as long as we have the old scheduler
> implementation.
> >> > Once it is replaced, we can think about using the SlotID instead of
> >> > AllocationIDs for identifying allocated slots. For dynamic slots we
> can
> >> > keep the special meaning of a SlotID with a negative index. In the
> >> future
> >> > we might think about making this encoding a bit more explicit by
> >> sending a
> >> > richer slot request object and reporting the actual SlotID back to the
> >> RM.
> >> >
> >> > For the question of resource utilization vs. deployment latency I
> >> believe
> >> > that this will be a question of requirements and preferences as you've
> >> said
> >> > Xintong. I can see that we will have different strategies to fulfill
> the
> >> > different needs.
> >> >
> >> > # Offer/free slots between JM/TM
> >> >
> >> > You are right Xintong that the existing slot protocol was developed
> with
> >> > the assumption in mind that the RM and JM can run in separate
> processes
> >> and
> >> > that a failure of the RM should only affect the JM in the sense that
> it
> >> > cannot ask for more resources. I believe that one could simplify
> things
> >> a
> >> > bit under the assumption that the RM and JM are always colocated in
> the
> >> > same process. However, the discussion whether to change it or not
> should
> >> > indeed be a separate one.
> >> >
> >> > Changing the slot protocol to a declarative resource management should
> >> > already solve the first problem you have described because we won't
> ask
> >> for
> >> > new slots in case of a failover but simply keep the same resource
> >> > requirements declared and let the RM make sure that we will receive at
> >> > least this amount of slots.
> >> >
> >> > If releasing a slot should lead to allocating new resources because
> >> > decreasing the resource requirement declaration takes longer than
> >> releasing
> >> > the slot on the TM, then we could apply what Chesnay said. By waiting
> on
> >> > the confirmation of the resource requirement decrease and then freeing
> >> the
> >> > slot on the TM gives you effectively the same behaviour as if the
> >> freeing
> >> > of the slot would be done by the RM.
> >> >
> >> > I am not entirely sure whether allocating the slots and receiving the
> >> slot
> >> > offers through the RM will allow us to get rid of the pending slot
> >> state on
> >> > the RM side. If the RM needs to communicate with the TM and we want to
> >> have
> >> > a reconciliation protocol between these components, then I think we
> >> would
> >> > have to solve the exact same problem of currently waiting on the TM
> for
> >> > confirming that a slot has been allocated.
> >> >
> >> > # Implications for the scheduling
> >> >
> >> > The FLIP does not fully cover the changes for the scheduler and mainly
> >> > drafts the rough idea. For the batch scheduling, I believe that we
> have
> >> a
> >> > couple degrees of freedom in how to do things. In the scenario you
> >> > described, one could choose a simple strategy where we wait for all
> >> > producers to stop before deciding on the parallelism of the consumer
> and
> >> > scheduling the respective tasks (even though they have POINTWISE
> >> BLOCKING
> >> > edges). Or we can try to be smart and say if we get at least one slot
> >> that
> >> > we can run the consumers with the same parallelism as the producers it
> >> just
> >> > might be that we have to run them one after another in a single slot.
> >> One
> >> > advantage of not directly schedule the first consumer when the first
> >> > producer is finished is that one might schedule the consumer stage
> with
> >> a
> >> > higher parallelism because one might acquire more resources a bit
> later.
> >> > But I would see this as different execution strategies which have
> >> different
> >> > properties.
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Fri, Aug 28, 2020 at 11:21 AM Zhu Zhu <re...@gmail.com> wrote:
> >> >
> >> > > Thanks for the explanation @Chesnay Schepler <ch...@apache.org> .
> >> > >
> >> > > Yes, for batch jobs it can be safe to schedule downstream vertices
> if
> >> > > there
> >> > > are enough slots in the pool, even if these slots are still in use
> at
> >> > that
> >> > > moment.
> >> > > And the job can still progress even if the vertices stick to the
> >> original
> >> > > parallelism.
> >> > >
> >> > > Looks to me several decision makings can be different for streaming
> >> and
> >> > > batch jobs.
> >> > > Looking forward to the follow-up FLIP on the lazy ExecutionGraph
> >> > > construction!
> >> > >
> >> > > Thanks,
> >> > > Zhu
> >> > >
> >> > > Chesnay Schepler <ch...@apache.org> 于2020年8月28日周五 下午4:35写道:
> >> > >
> >> > >> Maybe :)
> >> > >>
> >> > >> Imagine a case where the producer and consumer have the same
> >> > >> ResourceProfile, or at least one where the consumer requirements
> are
> >> > less
> >> > >> than the producer ones.
> >> > >> In this case, the scheduler can happily schedule consumers, because
> >> it
> >> > >> knows it will get enough slots.
> >> > >>
> >> > >> If the profiles are different, then the Scheduler _may_ wait
> >> > >> numberOf(producer) slots; it _may_ also stick with the parallelism
> >> and
> >> > >> schedule right away, in the worst case running the consumers in
> >> > sequence.
> >> > >> In fact, for batch jobs there is probably(?) never a reason for the
> >> > >> scheduler to _reduce_ the parallelism; it can always try to run
> >> things
> >> > in
> >> > >> sequence if it doesn't get enough slots.
> >> > >> Reducing the parallelism would just mean that you'd have to wait
> for
> >> > more
> >> > >> producers to finish.
> >> > >>
> >> > >> The scope of this FLIP is just the protocol, without changes to the
> >> > >> scheduler; in other words just changing how slots are acquired, but
> >> > change
> >> > >> nothing about the scheduling. That is tackled in a follow-up FLIP.
> >> > >>
> >> > >> On 28/08/2020 07:34, Zhu Zhu wrote:
> >> > >>
> >> > >> Thanks for the response!
> >> > >>
> >> > >> >> The scheduler doesn't have to wait for one stage to finish
> >> > >> Does it mean we will declare resources and decide the parallelism
> >> for a
> >> > >> stage which is partially
> >> > >> schedulable, i.e. when input data are ready just for part of the
> >> > >> execution vertices?
> >> > >>
> >> > >> >> This will get more complicated once we allow the scheduler to
> >> change
> >> > >> the parallelism while the job is running
> >> > >> Agreed. Looks to me it's a problem for batch jobs only and can be
> >> > avoided
> >> > >> for streaming jobs.
> >> > >> Will this FLIP limit its scope to streaming jobs, and improvements
> >> for
> >> > >> batch jobs are to be done later?
> >> > >>
> >> > >> Thanks,
> >> > >> Zhu
> >> > >>
> >> > >> Chesnay Schepler <ch...@apache.org> 于2020年8月28日周五 上午2:27写道:
> >> > >>
> >> > >>> The scheduler doesn't have to wait for one stage to finish. It is
> >> still
> >> > >>> aware that the upstream execution vertex has finished, and can
> >> > request/use
> >> > >>> slots accordingly to schedule the consumer.
> >> > >>>
> >> > >>> This will get more complicated once we allow the scheduler to
> change
> >> > the
> >> > >>> parallelism while the job is running, for which we will need some
> >> > >>> enhancements to the network stack to allow the producer to run
> >> without
> >> > >>> knowing the consumer parallelism ahead of time. I'm not too clear
> on
> >> > the
> >> > >>> details, but we'll some form of keygroup-like approach for sub
> >> > partitions
> >> > >>> (maxParallelism and all that).
> >> > >>>
> >> > >>> On 27/08/2020 20:05, Zhu Zhu wrote:
> >> > >>>
> >> > >>> Thanks Chesnay&Till for proposing this improvement.
> >> > >>> It's of good value to allow jobs to make best use of available
> >> > resources
> >> > >>> adaptively. Not
> >> > >>> to mention it further supports reactive mode.
> >> > >>> So big +1 for it.
> >> > >>>
> >> > >>> I have a minor concern about possible regression in certain cases
> >> due
> >> > to
> >> > >>> the proposed
> >> > >>> JobVertex-wise scheduling which replaces current
> >> ExecutionVertex-wise
> >> > >>> scheduling.
> >> > >>> In the proposal, looks to me it requires a stage to finish before
> >> its
> >> > >>> consumer stage can be
> >> > >>> scheduled. This limitation, however, does not exist in current
> >> > >>> scheduler. In the case that there
> >> > >>> exists a POINTWISE BLOCKING edge, the downstream execution region
> >> can
> >> > be
> >> > >>> scheduled
> >> > >>> right after its connected upstream execution vertices finishes,
> even
> >> > >>> before the whole upstream
> >> > >>> stage finishes. This allows the region to be launched earlier and
> >> make
> >> > >>> use of available resources.
> >> > >>> Do we need to let the new scheduler retain this property?
> >> > >>>
> >> > >>> Thanks,
> >> > >>> Zhu
> >> > >>>
> >> > >>> Xintong Song <to...@gmail.com> 于2020年8月26日周三 下午6:59写道:
> >> > >>>
> >> > >>>> Thanks for the quick response.
> >> > >>>>
> >> > >>>> *Job prioritization, Allocation IDs, Minimum resource
> >> > >>>> requirements, SlotManager Implementation Plan:* Sounds good to
> me.
> >> > >>>>
> >> > >>>> *FLIP-56*
> >> > >>>> Good point about the trade-off. I believe maximum resource
> >> utilization
> >> > >>>> and
> >> > >>>> quick deployment are desired in different scenarios. E.g., a long
> >> > >>>> running
> >> > >>>> streaming job deserves some deployment latency to improve the
> >> resource
> >> > >>>> utilization, which benefits the entire lifecycle of the job. On
> the
> >> > >>>> other
> >> > >>>> hand, short batch queries may prefer quick deployment, otherwise
> >> the
> >> > >>>> time
> >> > >>>> for resource allocation might significantly increase the response
> >> > time.
> >> > >>>> It would be good enough for me to bring these questions to
> >> attention.
> >> > >>>> Nothing that I'm aware of should block this FLIP.
> >> > >>>>
> >> > >>>> Thank you~
> >> > >>>>
> >> > >>>> Xintong Song
> >> > >>>>
> >> > >>>>
> >> > >>>>
> >> > >>>> On Wed, Aug 26, 2020 at 5:14 PM Chesnay Schepler <
> >> chesnay@apache.org>
> >> > >>>> wrote:
> >> > >>>>
> >> > >>>> > Thank you Xintong for your questions!
> >> > >>>> > Job prioritization
> >> > >>>> > Yes, the job which declares it's initial requirements first is
> >> > >>>> prioritized.
> >> > >>>> > This is very much for simplicity; for example this avoids the
> >> nasty
> >> > >>>> case
> >> > >>>> > where all jobs get some resources, but none get enough to
> >> actually
> >> > >>>> run the
> >> > >>>> > job.
> >> > >>>> > Minimum resource requirements
> >> > >>>> >
> >> > >>>> > My bad; at some point we want to allow the JobMaster to
> declare a
> >> > >>>> range of
> >> > >>>> > resources it could use to run a job, for example min=1,
> >> target=10,
> >> > >>>> > max=+inf.
> >> > >>>> >
> >> > >>>> > With this model, the RM would then try to balance the resources
> >> such
> >> > >>>> that
> >> > >>>> > as many jobs as possible are as close to the target state as
> >> > possible.
> >> > >>>> >
> >> > >>>> > Currently, the minimum/target/maximum resources are all the
> >> same. So
> >> > >>>> the
> >> > >>>> > notification is sent whenever the current requirements cannot
> be
> >> > met.
> >> > >>>> > Allocation IDs
> >> > >>>> > We do intend to, at the very least, remove AllocationIDs on the
> >> > >>>> > SlotManager side, as they are just not required there.
> >> > >>>> >
> >> > >>>> > On the slotpool side we have to keep them around at least until
> >> the
> >> > >>>> > existing Slotpool implementations are removed (not sure whether
> >> > we'll
> >> > >>>> fully
> >> > >>>> > commit to this in 1.12), since the interfaces use
> AllocationIDs,
> >> > >>>> which also
> >> > >>>> > bleed into the JobMaster.
> >> > >>>> > The TaskExecutor is in a similar position.
> >> > >>>> > But in the long-term, yes they will be removed, and most usages
> >> will
> >> > >>>> > probably be replaced by the SlotID.
> >> > >>>> > FLIP-56
> >> > >>>> >
> >> > >>>> > Dynamic slot allocations are indeed quite interesting and
> raise a
> >> > few
> >> > >>>> > questions; for example, the main purpose of it is to ensure
> >> maximum
> >> > >>>> > resource utilization. In that case, should the JobMaster be
> >> allowed
> >> > to
> >> > >>>> > re-use a slot it if the task requires less resources than the
> >> slot
> >> > >>>> > provides, or should it always request a new slot that exactly
> >> > matches?
> >> > >>>> >
> >> > >>>> > There is a trade-off to be made between maximum resource
> >> utilization
> >> > >>>> > (request exactly matching slots, and only re-use exact matches)
> >> and
> >> > >>>> quicker
> >> > >>>> > job deployment (re-use slot even if they don't exactly match,
> >> skip
> >> > >>>> > round-trip to RM).
> >> > >>>> >
> >> > >>>> > As for how to handle the lack of a preemptively known SlotIDs,
> >> that
> >> > >>>> should
> >> > >>>> > be fine in and of itself; we already handle a similar case when
> >> we
> >> > >>>> request
> >> > >>>> > a new TaskExecutor to be started. So long as there is some way
> to
> >> > >>>> know how
> >> > >>>> > many resources the TaskExecutor has in total I do not see a
> >> problem
> >> > >>>> at the
> >> > >>>> > moment. We will get the SlotID eventually by virtue of the
> >> heartbeat
> >> > >>>> > SlotReport.
> >> > >>>> > Implementation plan (SlotManager)
> >> > >>>> > You are on the right track. The SlotManager tracks the declared
> >> > >>>> resource
> >> > >>>> > requirements, and if the requirements increased it creates a
> >> > >>>> SlotRequest,
> >> > >>>> > which then goes through similar code paths as we have at the
> >> moment
> >> > >>>> (try to
> >> > >>>> > find a free slot, if found tell the TM, otherwise try to
> request
> >> new
> >> > >>>> TM).
> >> > >>>> > The SlotManager changes are not that substantial to get a
> working
> >> > >>>> version;
> >> > >>>> > we have a PoC and most of the work went into refactoring the
> >> > >>>> SlotManager
> >> > >>>> > into a more manageable state. (split into several components,
> >> > >>>> stricter and
> >> > >>>> > simplified Slot life-cycle, ...).
> >> > >>>> > Offer/free slots between JM/TM
> >> > >>>> > Gotta run, but that's a good question and I'll think about.
> But I
> >> > >>>> think it
> >> > >>>> > comes down to making less changes, and being able to leverage
> >> > existing
> >> > >>>> > reconciliation protocols.
> >> > >>>> > Do note that TaskExecutor also explicitly inform the RM about
> >> freed
> >> > >>>> slots;
> >> > >>>> > the heartbeat slot report is just a safety net.
> >> > >>>> > I'm not sure whether slot requests are able to overtake a slot
> >> > >>>> release;
> >> > >>>> > @till do you have thoughts on that?
> >> > >>>> > As for the race condition between the requirements reduction
> and
> >> > slot
> >> > >>>> > release, if we run into problems we have the backup plan of
> only
> >> > >>>> releasing
> >> > >>>> > the slot after the requirement reduction has been acknowledged.
> >> > >>>> >
> >> > >>>> > On 26/08/2020 10:31, Xintong Song wrote:
> >> > >>>> >
> >> > >>>> > Thanks for preparing the FLIP and driving this discussion,
> >> @Chesnay
> >> > &
> >> > >>>> @Till.
> >> > >>>> >
> >> > >>>> > I really like the idea. I see a great value in the proposed
> >> > >>>> declarative
> >> > >>>> > resource management, in terms of flexibility, usability and
> >> > >>>> efficiency.
> >> > >>>> >
> >> > >>>> > I have a few comments and questions regarding the FLIP design.
> In
> >> > >>>> general,
> >> > >>>> > the protocol design makes good sense to me. My main concern is
> >> that
> >> > >>>> it is
> >> > >>>> > not very clear to me what changes are required from the
> >> > >>>> > Resource/SlotManager side to adapt to the new protocol.
> >> > >>>> >
> >> > >>>> > *1. Distributed slots across different jobs*
> >> > >>>> >
> >> > >>>> > Jobs which register their requirements first, will have
> >> precedence
> >> > >>>> over
> >> > >>>> >
> >> > >>>> > other jobs also if the requirements change during the runtime.
> >> > >>>> >
> >> > >>>> > Just trying to understand, does this mean jobs are prioritized
> by
> >> > the
> >> > >>>> order
> >> > >>>> > of their first resource declaring?
> >> > >>>> >
> >> > >>>> > *2. AllocationID*
> >> > >>>> >
> >> > >>>> > Is this FLIP suggesting to completely remove AllocationID?
> >> > >>>> >
> >> > >>>> > I'm fine with this change. It seems where AllocationID is used
> >> can
> >> > >>>> either
> >> > >>>> > be removed or be replaced by JobID. This reflects the concept
> >> that
> >> > >>>> slots
> >> > >>>> > are now assigned to a job instead of its individual slot
> >> requests.
> >> > >>>> >
> >> > >>>> > I would like to bring to attention that this also requires
> >> changes
> >> > on
> >> > >>>> the
> >> > >>>> > TM side, with respect to FLIP-56[1].
> >> > >>>> >
> >> > >>>> > In the context of dynamic slot allocation introduced by
> FLIP-56,
> >> > >>>> slots do
> >> > >>>> > not pre-exist on TM and are dynamically created when RM calls
> >> > >>>> > TaskExecutorGateway.requestSlot. Since the slots do not
> >> pre-exist,
> >> > nor
> >> > >>>> > their SlotIDs, RM requests slots from TM with a special SlotID
> >> > >>>> (negative
> >> > >>>> > slot index). The semantic changes from "requesting the slot
> >> > >>>> identified by
> >> > >>>> > the given SlotID" to "requesting a slot with the given resource
> >> > >>>> profile".
> >> > >>>> > The AllocationID is used for identifying the dynamic slots in
> >> such
> >> > >>>> cases.
> >> > >>>> >
> >> > >>>> > >From the perspective of FLIP-56 and fine grained resource
> >> > >>>> management, I'm
> >> > >>>> > fine with removing AllocationID. In the meantime, we would need
> >> TM
> >> > to
> >> > >>>> > recognize the special negative indexed SlotID and generate a
> new
> >> > >>>> unique
> >> > >>>> > SlotID for identifying the slot.
> >> > >>>> >
> >> > >>>> > *3. Minimum resource requirement*
> >> > >>>> >
> >> > >>>> > However, we can let the JobMaster know if we cannot fulfill the
> >> > >>>> minimum
> >> > >>>> >
> >> > >>>> > resource requirement for a job after
> >> > >>>> > resourcemanager.standalone.start-up-time has passed.
> >> > >>>> >
> >> > >>>> > What is the "minimum resource requirement for a job"? Did I
> >> overlook
> >> > >>>> > anything?
> >> > >>>> >
> >> > >>>> > *4. Offer/free slots between JM/TM*
> >> > >>>> >
> >> > >>>> > This probably deserves a separate discussion thread. Just want
> to
> >> > >>>> bring it
> >> > >>>> > up.
> >> > >>>> >
> >> > >>>> > The idea has been coming to me for quite some time. Is this
> >> design,
> >> > >>>> that JM
> >> > >>>> > requests resources from RM while accepting/releasing resources
> >> > >>>> from/to TM,
> >> > >>>> > the right thing?
> >> > >>>> >
> >> > >>>> > The pain point is that events of JM's activities
> >> > (requesting/releasing
> >> > >>>> > resources) arrive at RM out of order. This leads to several
> >> > problems.
> >> > >>>> >
> >> > >>>> >    - When a job fails and task cancelation takes long, some of
> >> the
> >> > >>>> slots
> >> > >>>> >    might be released from the slot pool due to being unused
> for a
> >> > >>>> while. Then
> >> > >>>> >    the job restarts and requests these slots again. At this
> >> time, RM
> >> > >>>> may
> >> > >>>> >    receive slot requests before noticing from TM heartbeats
> that
> >> > >>>> previous
> >> > >>>> >    slots are released, thus requesting new resources. I've seen
> >> many
> >> > >>>> times
> >> > >>>> >    that the Yarn cluster has a heavy load and is not allocating
> >> > >>>> resources
> >> > >>>> >    quickly enough, which leads to slot request timeout and job
> >> > >>>> failover, and
> >> > >>>> >    during the failover more resources are requested which adds
> >> more
> >> > >>>> load to
> >> > >>>> >    the Yarn cluster. Happily, this should be improved with the
> >> > >>>> declarative
> >> > >>>> >    resource management. :)
> >> > >>>> >    - As described in this FLIP, it is possible that RM learns
> the
> >> > >>>> releasing
> >> > >>>> >    of slots from TM heartbeat before noticing the resource
> >> > requirement
> >> > >>>> >    decreasing, it may allocate more resources which need to be
> >> > >>>> released soon.
> >> > >>>> >    - It complicates the ResourceManager/SlotManager, by
> >> requiring an
> >> > >>>> >    additional slot state PENDING, which means the slot is
> >> assigned
> >> > by
> >> > >>>> RM but
> >> > >>>> >    is not confirmed successfully ordered by TM.
> >> > >>>> >
> >> > >>>> > Why not just make RM offer the allocated resources (TM address,
> >> > >>>> SlotID,
> >> > >>>> > etc.) to JM, and JM release resources to RM? So that for all
> the
> >> > >>>> resource
> >> > >>>> > management JM talks to RM, and for the task deployment and
> >> execution
> >> > >>>> it
> >> > >>>> > talks to TM?
> >> > >>>> >
> >> > >>>> > I tried to understand the benefits for having the current
> design,
> >> > and
> >> > >>>> found
> >> > >>>> > the following in FLIP-6[2].
> >> > >>>> >
> >> > >>>> >
> >> > >>>> > All that the ResourceManager does is negotiate between the
> >> > >>>> > cluster-manager, the JobManager, and the TaskManagers. Its
> state
> >> can
> >> > >>>> hence
> >> > >>>> > be reconstructed from re-acquiring containers and
> re-registration
> >> > from
> >> > >>>> > JobManagers and TaskManagers
> >> > >>>> >
> >> > >>>> > Correct me if I'm wrong, it seems the original purpose is to
> make
> >> > >>>> sure the
> >> > >>>> > assignment between jobs and slots are confirmed between JM and
> >> TMs,
> >> > >>>> so that
> >> > >>>> > failures of RM will not lead to any inconsistency. However,
> this
> >> > only
> >> > >>>> > benefits scenarios where RM fails while JM and TMs live.
> >> Currently,
> >> > >>>> JM and
> >> > >>>> > RM are in the same process. We do not really have any scenario
> >> where
> >> > >>>> RM
> >> > >>>> > fails alone. We might separate JM and RM to different processes
> >> in
> >> > >>>> future,
> >> > >>>> > but as far as I can see we don't have such requirements at the
> >> > >>>> moment. It
> >> > >>>> > seems to me that we are suffering the current problems,
> >> complying to
> >> > >>>> > potential future benefits.
> >> > >>>> >
> >> > >>>> > Maybe I overlooked something.
> >> > >>>> >
> >> > >>>> > *5. Implementation Plan*
> >> > >>>> >
> >> > >>>> > For SlotPool, it sounds quite straightforward to "aggregate
> >> > >>>> individual slot
> >> > >>>> > requests".
> >> > >>>> >
> >> > >>>> > For Resource/SlotManager, it seems there are quite a lot
> changes
> >> > >>>> needed,
> >> > >>>> > with the removal of individual slot requests and AllocationID.
> >> It's
> >> > >>>> not
> >> > >>>> > clear to me what is the first step plan for RM/SM? Do we
> >> internally
> >> > >>>> treat
> >> > >>>> > the resource requirements as individual slot requests as the
> >> first
> >> > >>>> step, so
> >> > >>>> > only the interfaces are changed? Or do we actually change
> >> > (practically
> >> > >>>> > re-write) the slot allocation logics?
> >> > >>>> >
> >> > >>>> > Thank you~
> >> > >>>> >
> >> > >>>> > Xintong Song
> >> > >>>> >
> >> > >>>> >
> >> > >>>> > [1]
> >> > >>>>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-56%3A+Dynamic+Slot+Allocation
> >> > >>>> > [2]
> >> > >>>>
> >> >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077
> >> > >>>> >
> >> > >>>> > On Tue, Aug 25, 2020 at 4:48 PM Chesnay Schepler <
> >> > chesnay@apache.org>
> >> > >>>> <ch...@apache.org> wrote:
> >> > >>>> >
> >> > >>>> >
> >> > >>>> > Hello,
> >> > >>>> >
> >> > >>>> > in FLIP-138 we want to rework the way the JobMaster acquires
> >> slots,
> >> > >>>> such
> >> > >>>> > that required resources are declared before a job is scheduled
> >> and
> >> > th
> >> > >>>> > job execution is adjusted according to the provided resources
> >> (e.g.,
> >> > >>>> > reducing parallelism), instead of asking for a fixed number of
> >> > >>>> resources
> >> > >>>> > during scheduling and failing midway through if not enough
> >> resources
> >> > >>>> are
> >> > >>>> > available.
> >> > >>>> >
> >> > >>>> > This is a stepping stone towards reactive mode, where Flink
> will
> >> > >>>> > automatically make use of new TaskExecutors being started.
> >> > >>>> >
> >> > >>>> > More details can be found here
> >> > >>>> > <
> >> > >>>>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management
> >> > >>>> >
> >> > >>>> > .
> >> > >>>> >
> >> > >>>> >
> >> > >>>> >
> >> > >>>>
> >> > >>>
> >> > >>>
> >> > >>
> >> >
> >>
> >
>