You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Jarek Potiuk <ja...@potiuk.com> on 2022/04/01 13:24:15 UTC

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

I think we all very much agree  that first we need to define
responsibilities for those :). How about someone starts a nice design
doc with graphs etc. we can discuss :) ?

On Thu, Mar 31, 2022 at 11:11 PM Ping Zhang <pi...@umich.edu> wrote:
>>
>> Airflow would benefit overall from a clearer distinction of what part (scheduler, executor, task handler, local task job, triggerer, etc.) is responsible for updates to each state so we start getting a clearer picture
>
>
> Andrew, this is exactly the point. Currently, there is not a clear interface for state change handling in each component. The main reason that I was proposing using the scheduler to deal with the stuck queued state is the executor currently does not handle ti state changes and the executor events are handled by the scheduler. I think we should have a clear interface and responsibility for each component (scheduler, executor, airflow run --local process, airflow run --raw process, trigger) first before we decide the best approach.
>
> Thanks,
>
> Ping
>
>
> On Fri, Mar 25, 2022 at 9:19 AM Andrew Godwin <an...@astronomer.io.invalid> wrote:
>>
>> Yes, that was roughly the idea Jarek - if the executor is only running inside the scheduler and has no external process component, it'd be nice to have a part of the "executor interface" that got called periodically for cleanup (QUEUED or otherwise). In my internal executor experiments, we've had to use a separate process for this, though that has its own advantages.
>>
>> I think one good thing to establish, though, would be that only executor code touches task instances in that state (as part of a general overall rule that only one component is responsible for each state) - I think Airflow would benefit overall from a clearer distinction of what part (scheduler, executor, task handler, local task job, triggerer, etc.) is responsible for updates to each state so we start getting a clearer picture of where any bugs could be in distributed state machine terms.
>>
>> Andrew
>>
>> On Thu, Mar 24, 2022 at 7:12 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>
>>> > 2. scheduler invokes that method periodically.
>>>
>>> I think this is not the right approach. I think I see what Andrew
>>> means here, but I think we should not assume that the scheduler will
>>> periodically call some method. Depending on the executor
>>> implementation (say for example future Fargate Executor or Cloud Run
>>> executor). Cleaning queued tasks might actually be done differently
>>> (there might be notification in the executor itself for the tasks that
>>> are queued and stuck and Scheduler might not need to periodically
>>> query it.
>>>
>>> I'd say a better approach (and possibly Andrew that's what you had in
>>> mind) is to have a separate method in the "executor" protocol -
>>> "start_cleanup_of_queued_tasks()". And one implementation of it (The
>>> one in BaseExecutor now) could do periodic cleanup. But the future
>>> Fargate Executor could have it implemented differently.
>>>
>>> I think we already have a few methods like that in BaseExecutor that
>>> also have some implementation that will not really be useful in other
>>> executors, so deriving an executor from BaseExecutor which has some
>>> implementation that will likely need to be overridden in other
>>> executors. I think we should start with what Andrew proposed (I
>>> think). Take the existing executors, extract really an
>>> "ExecutorProtocol", possibly add ExecutorMixin (or even few) to add
>>> some common behaviour for executors and make sure we got it right -
>>> probably at the time we (or someone else) writes a new executor. Just
>>> to make sure we are not trying to make "common" code for something
>>> that is not really "common".
>>>
>>> But maybe I am misinterpreting the intentions :)
>>>
>>> J.

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

Posted by Ping Zhang <pi...@umich.edu>.
Cool, I can start to work on it.

Thanks,

Ping


On Fri, Apr 1, 2022 at 6:24 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> I think we all very much agree  that first we need to define
> responsibilities for those :). How about someone starts a nice design
> doc with graphs etc. we can discuss :) ?
>
> On Thu, Mar 31, 2022 at 11:11 PM Ping Zhang <pi...@umich.edu> wrote:
> >>
> >> Airflow would benefit overall from a clearer distinction of what part
> (scheduler, executor, task handler, local task job, triggerer, etc.) is
> responsible for updates to each state so we start getting a clearer picture
> >
> >
> > Andrew, this is exactly the point. Currently, there is not a clear
> interface for state change handling in each component. The main reason that
> I was proposing using the scheduler to deal with the stuck queued state is
> the executor currently does not handle ti state changes and the executor
> events are handled by the scheduler. I think we should have a clear
> interface and responsibility for each component (scheduler, executor,
> airflow run --local process, airflow run --raw process, trigger) first
> before we decide the best approach.
> >
> > Thanks,
> >
> > Ping
> >
> >
> > On Fri, Mar 25, 2022 at 9:19 AM Andrew Godwin <
> andrew.godwin@astronomer.io.invalid> wrote:
> >>
> >> Yes, that was roughly the idea Jarek - if the executor is only running
> inside the scheduler and has no external process component, it'd be nice to
> have a part of the "executor interface" that got called periodically for
> cleanup (QUEUED or otherwise). In my internal executor experiments, we've
> had to use a separate process for this, though that has its own advantages.
> >>
> >> I think one good thing to establish, though, would be that only
> executor code touches task instances in that state (as part of a general
> overall rule that only one component is responsible for each state) - I
> think Airflow would benefit overall from a clearer distinction of what part
> (scheduler, executor, task handler, local task job, triggerer, etc.) is
> responsible for updates to each state so we start getting a clearer picture
> of where any bugs could be in distributed state machine terms.
> >>
> >> Andrew
> >>
> >> On Thu, Mar 24, 2022 at 7:12 AM Jarek Potiuk <ja...@potiuk.com> wrote:
> >>>
> >>> > 2. scheduler invokes that method periodically.
> >>>
> >>> I think this is not the right approach. I think I see what Andrew
> >>> means here, but I think we should not assume that the scheduler will
> >>> periodically call some method. Depending on the executor
> >>> implementation (say for example future Fargate Executor or Cloud Run
> >>> executor). Cleaning queued tasks might actually be done differently
> >>> (there might be notification in the executor itself for the tasks that
> >>> are queued and stuck and Scheduler might not need to periodically
> >>> query it.
> >>>
> >>> I'd say a better approach (and possibly Andrew that's what you had in
> >>> mind) is to have a separate method in the "executor" protocol -
> >>> "start_cleanup_of_queued_tasks()". And one implementation of it (The
> >>> one in BaseExecutor now) could do periodic cleanup. But the future
> >>> Fargate Executor could have it implemented differently.
> >>>
> >>> I think we already have a few methods like that in BaseExecutor that
> >>> also have some implementation that will not really be useful in other
> >>> executors, so deriving an executor from BaseExecutor which has some
> >>> implementation that will likely need to be overridden in other
> >>> executors. I think we should start with what Andrew proposed (I
> >>> think). Take the existing executors, extract really an
> >>> "ExecutorProtocol", possibly add ExecutorMixin (or even few) to add
> >>> some common behaviour for executors and make sure we got it right -
> >>> probably at the time we (or someone else) writes a new executor. Just
> >>> to make sure we are not trying to make "common" code for something
> >>> that is not really "common".
> >>>
> >>> But maybe I am misinterpreting the intentions :)
> >>>
> >>> J.
>