You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Ping Zhang <pi...@umich.edu> on 2022/03/15 21:33:07 UTC

[DISCUSSION] let scheduler heal tasks stuck in queued state

Hi all,

Currently, the tasks could be stuck at `queued` state and could not be
scheduled by the scheduler or picked up by the worker. This could happen
when failure happens after a task is marked by `queued` before the executor
marks it as `running`.

There is a fix https://github.com/apache/airflow/pull/19769/files for the
celery executor. However, it only targets the CeleryExecutor and it leaks
the scheduler responsibility to the executor.

We propose to move the state reconciliation logic to the scheduler. The
proposed change:

1. when getting the _executable_task_instances_to_queued (
https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287),
it also includes QUEUED state, like:   .filter(TI.state.in_([
TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED]). In this way, the
scheduler can process queued tis.

2. In the queue_command method in the base executor, it conditionally
(based on last enqueued time by the executor, this is configurable) queues
the ti even if its state is QUEUED.

This could potentially send the same tasks twice to the executor. Since in
the worker side, there are condition checks about whether a task can run or
not. This won't cause issues.

We have been running this in our prod for a while and would love to
contribute it.

Please let me know your thoughts.

Thanks,

Ping

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

Posted by Ping Zhang <pi...@umich.edu>.
Cool, I can start to work on it.

Thanks,

Ping


On Fri, Apr 1, 2022 at 6:24 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> I think we all very much agree  that first we need to define
> responsibilities for those :). How about someone starts a nice design
> doc with graphs etc. we can discuss :) ?
>
> On Thu, Mar 31, 2022 at 11:11 PM Ping Zhang <pi...@umich.edu> wrote:
> >>
> >> Airflow would benefit overall from a clearer distinction of what part
> (scheduler, executor, task handler, local task job, triggerer, etc.) is
> responsible for updates to each state so we start getting a clearer picture
> >
> >
> > Andrew, this is exactly the point. Currently, there is not a clear
> interface for state change handling in each component. The main reason that
> I was proposing using the scheduler to deal with the stuck queued state is
> the executor currently does not handle ti state changes and the executor
> events are handled by the scheduler. I think we should have a clear
> interface and responsibility for each component (scheduler, executor,
> airflow run --local process, airflow run --raw process, trigger) first
> before we decide the best approach.
> >
> > Thanks,
> >
> > Ping
> >
> >
> > On Fri, Mar 25, 2022 at 9:19 AM Andrew Godwin <
> andrew.godwin@astronomer.io.invalid> wrote:
> >>
> >> Yes, that was roughly the idea Jarek - if the executor is only running
> inside the scheduler and has no external process component, it'd be nice to
> have a part of the "executor interface" that got called periodically for
> cleanup (QUEUED or otherwise). In my internal executor experiments, we've
> had to use a separate process for this, though that has its own advantages.
> >>
> >> I think one good thing to establish, though, would be that only
> executor code touches task instances in that state (as part of a general
> overall rule that only one component is responsible for each state) - I
> think Airflow would benefit overall from a clearer distinction of what part
> (scheduler, executor, task handler, local task job, triggerer, etc.) is
> responsible for updates to each state so we start getting a clearer picture
> of where any bugs could be in distributed state machine terms.
> >>
> >> Andrew
> >>
> >> On Thu, Mar 24, 2022 at 7:12 AM Jarek Potiuk <ja...@potiuk.com> wrote:
> >>>
> >>> > 2. scheduler invokes that method periodically.
> >>>
> >>> I think this is not the right approach. I think I see what Andrew
> >>> means here, but I think we should not assume that the scheduler will
> >>> periodically call some method. Depending on the executor
> >>> implementation (say for example future Fargate Executor or Cloud Run
> >>> executor). Cleaning queued tasks might actually be done differently
> >>> (there might be notification in the executor itself for the tasks that
> >>> are queued and stuck and Scheduler might not need to periodically
> >>> query it.
> >>>
> >>> I'd say a better approach (and possibly Andrew that's what you had in
> >>> mind) is to have a separate method in the "executor" protocol -
> >>> "start_cleanup_of_queued_tasks()". And one implementation of it (The
> >>> one in BaseExecutor now) could do periodic cleanup. But the future
> >>> Fargate Executor could have it implemented differently.
> >>>
> >>> I think we already have a few methods like that in BaseExecutor that
> >>> also have some implementation that will not really be useful in other
> >>> executors, so deriving an executor from BaseExecutor which has some
> >>> implementation that will likely need to be overridden in other
> >>> executors. I think we should start with what Andrew proposed (I
> >>> think). Take the existing executors, extract really an
> >>> "ExecutorProtocol", possibly add ExecutorMixin (or even few) to add
> >>> some common behaviour for executors and make sure we got it right -
> >>> probably at the time we (or someone else) writes a new executor. Just
> >>> to make sure we are not trying to make "common" code for something
> >>> that is not really "common".
> >>>
> >>> But maybe I am misinterpreting the intentions :)
> >>>
> >>> J.
>

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

Posted by Jarek Potiuk <ja...@potiuk.com>.
I think we all very much agree  that first we need to define
responsibilities for those :). How about someone starts a nice design
doc with graphs etc. we can discuss :) ?

On Thu, Mar 31, 2022 at 11:11 PM Ping Zhang <pi...@umich.edu> wrote:
>>
>> Airflow would benefit overall from a clearer distinction of what part (scheduler, executor, task handler, local task job, triggerer, etc.) is responsible for updates to each state so we start getting a clearer picture
>
>
> Andrew, this is exactly the point. Currently, there is not a clear interface for state change handling in each component. The main reason that I was proposing using the scheduler to deal with the stuck queued state is the executor currently does not handle ti state changes and the executor events are handled by the scheduler. I think we should have a clear interface and responsibility for each component (scheduler, executor, airflow run --local process, airflow run --raw process, trigger) first before we decide the best approach.
>
> Thanks,
>
> Ping
>
>
> On Fri, Mar 25, 2022 at 9:19 AM Andrew Godwin <an...@astronomer.io.invalid> wrote:
>>
>> Yes, that was roughly the idea Jarek - if the executor is only running inside the scheduler and has no external process component, it'd be nice to have a part of the "executor interface" that got called periodically for cleanup (QUEUED or otherwise). In my internal executor experiments, we've had to use a separate process for this, though that has its own advantages.
>>
>> I think one good thing to establish, though, would be that only executor code touches task instances in that state (as part of a general overall rule that only one component is responsible for each state) - I think Airflow would benefit overall from a clearer distinction of what part (scheduler, executor, task handler, local task job, triggerer, etc.) is responsible for updates to each state so we start getting a clearer picture of where any bugs could be in distributed state machine terms.
>>
>> Andrew
>>
>> On Thu, Mar 24, 2022 at 7:12 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>
>>> > 2. scheduler invokes that method periodically.
>>>
>>> I think this is not the right approach. I think I see what Andrew
>>> means here, but I think we should not assume that the scheduler will
>>> periodically call some method. Depending on the executor
>>> implementation (say for example future Fargate Executor or Cloud Run
>>> executor). Cleaning queued tasks might actually be done differently
>>> (there might be notification in the executor itself for the tasks that
>>> are queued and stuck and Scheduler might not need to periodically
>>> query it.
>>>
>>> I'd say a better approach (and possibly Andrew that's what you had in
>>> mind) is to have a separate method in the "executor" protocol -
>>> "start_cleanup_of_queued_tasks()". And one implementation of it (The
>>> one in BaseExecutor now) could do periodic cleanup. But the future
>>> Fargate Executor could have it implemented differently.
>>>
>>> I think we already have a few methods like that in BaseExecutor that
>>> also have some implementation that will not really be useful in other
>>> executors, so deriving an executor from BaseExecutor which has some
>>> implementation that will likely need to be overridden in other
>>> executors. I think we should start with what Andrew proposed (I
>>> think). Take the existing executors, extract really an
>>> "ExecutorProtocol", possibly add ExecutorMixin (or even few) to add
>>> some common behaviour for executors and make sure we got it right -
>>> probably at the time we (or someone else) writes a new executor. Just
>>> to make sure we are not trying to make "common" code for something
>>> that is not really "common".
>>>
>>> But maybe I am misinterpreting the intentions :)
>>>
>>> J.

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

Posted by Ping Zhang <pi...@umich.edu>.
>
> Airflow would benefit overall from a clearer distinction of what part
> (scheduler, executor, task handler, local task job, triggerer, etc.) is
> responsible for updates to each state so we start getting a clearer picture


Andrew, this is exactly the point. Currently, there is not a clear
interface for state change handling in each component. The main reason that
I was proposing using the scheduler to deal with the stuck queued state is
the executor currently does not handle ti state changes and the executor
events are handled by the scheduler. I think we should have a clear
interface and responsibility for each component (scheduler, executor,
airflow run --local process, airflow run --raw process, trigger) first
before we decide the best approach.

Thanks,

Ping


On Fri, Mar 25, 2022 at 9:19 AM Andrew Godwin
<an...@astronomer.io.invalid> wrote:

> Yes, that was roughly the idea Jarek - if the executor is only running
> inside the scheduler and has no external process component, it'd be nice to
> have a part of the "executor interface" that got called periodically for
> cleanup (QUEUED or otherwise). In my internal executor experiments, we've
> had to use a separate process for this, though that has its own advantages.
>
> I think one good thing to establish, though, would be that only executor
> code touches task instances in that state (as part of a general overall
> rule that only one component is responsible for each state) - I think
> Airflow would benefit overall from a clearer distinction of what part
> (scheduler, executor, task handler, local task job, triggerer, etc.) is
> responsible for updates to each state so we start getting a clearer picture
> of where any bugs could be in distributed state machine terms.
>
> Andrew
>
> On Thu, Mar 24, 2022 at 7:12 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> > 2. scheduler invokes that method periodically.
>>
>> I think this is not the right approach. I think I see what Andrew
>> means here, but I think we should not assume that the scheduler will
>> periodically call some method. Depending on the executor
>> implementation (say for example future Fargate Executor or Cloud Run
>> executor). Cleaning queued tasks might actually be done differently
>> (there might be notification in the executor itself for the tasks that
>> are queued and stuck and Scheduler might not need to periodically
>> query it.
>>
>> I'd say a better approach (and possibly Andrew that's what you had in
>> mind) is to have a separate method in the "executor" protocol -
>> "start_cleanup_of_queued_tasks()". And one implementation of it (The
>> one in BaseExecutor now) could do periodic cleanup. But the future
>> Fargate Executor could have it implemented differently.
>>
>> I think we already have a few methods like that in BaseExecutor that
>> also have some implementation that will not really be useful in other
>> executors, so deriving an executor from BaseExecutor which has some
>> implementation that will likely need to be overridden in other
>> executors. I think we should start with what Andrew proposed (I
>> think). Take the existing executors, extract really an
>> "ExecutorProtocol", possibly add ExecutorMixin (or even few) to add
>> some common behaviour for executors and make sure we got it right -
>> probably at the time we (or someone else) writes a new executor. Just
>> to make sure we are not trying to make "common" code for something
>> that is not really "common".
>>
>> But maybe I am misinterpreting the intentions :)
>>
>> J.
>>
>

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

Posted by Andrew Godwin <an...@astronomer.io.INVALID>.
Yes, that was roughly the idea Jarek - if the executor is only running
inside the scheduler and has no external process component, it'd be nice to
have a part of the "executor interface" that got called periodically for
cleanup (QUEUED or otherwise). In my internal executor experiments, we've
had to use a separate process for this, though that has its own advantages.

I think one good thing to establish, though, would be that only executor
code touches task instances in that state (as part of a general overall
rule that only one component is responsible for each state) - I think
Airflow would benefit overall from a clearer distinction of what part
(scheduler, executor, task handler, local task job, triggerer, etc.) is
responsible for updates to each state so we start getting a clearer picture
of where any bugs could be in distributed state machine terms.

Andrew

On Thu, Mar 24, 2022 at 7:12 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> > 2. scheduler invokes that method periodically.
>
> I think this is not the right approach. I think I see what Andrew
> means here, but I think we should not assume that the scheduler will
> periodically call some method. Depending on the executor
> implementation (say for example future Fargate Executor or Cloud Run
> executor). Cleaning queued tasks might actually be done differently
> (there might be notification in the executor itself for the tasks that
> are queued and stuck and Scheduler might not need to periodically
> query it.
>
> I'd say a better approach (and possibly Andrew that's what you had in
> mind) is to have a separate method in the "executor" protocol -
> "start_cleanup_of_queued_tasks()". And one implementation of it (The
> one in BaseExecutor now) could do periodic cleanup. But the future
> Fargate Executor could have it implemented differently.
>
> I think we already have a few methods like that in BaseExecutor that
> also have some implementation that will not really be useful in other
> executors, so deriving an executor from BaseExecutor which has some
> implementation that will likely need to be overridden in other
> executors. I think we should start with what Andrew proposed (I
> think). Take the existing executors, extract really an
> "ExecutorProtocol", possibly add ExecutorMixin (or even few) to add
> some common behaviour for executors and make sure we got it right -
> probably at the time we (or someone else) writes a new executor. Just
> to make sure we are not trying to make "common" code for something
> that is not really "common".
>
> But maybe I am misinterpreting the intentions :)
>
> J.
>

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

Posted by Jarek Potiuk <ja...@potiuk.com>.
> 2. scheduler invokes that method periodically.

I think this is not the right approach. I think I see what Andrew
means here, but I think we should not assume that the scheduler will
periodically call some method. Depending on the executor
implementation (say for example future Fargate Executor or Cloud Run
executor). Cleaning queued tasks might actually be done differently
(there might be notification in the executor itself for the tasks that
are queued and stuck and Scheduler might not need to periodically
query it.

I'd say a better approach (and possibly Andrew that's what you had in
mind) is to have a separate method in the "executor" protocol -
"start_cleanup_of_queued_tasks()". And one implementation of it (The
one in BaseExecutor now) could do periodic cleanup. But the future
Fargate Executor could have it implemented differently.

I think we already have a few methods like that in BaseExecutor that
also have some implementation that will not really be useful in other
executors, so deriving an executor from BaseExecutor which has some
implementation that will likely need to be overridden in other
executors. I think we should start with what Andrew proposed (I
think). Take the existing executors, extract really an
"ExecutorProtocol", possibly add ExecutorMixin (or even few) to add
some common behaviour for executors and make sure we got it right -
probably at the time we (or someone else) writes a new executor. Just
to make sure we are not trying to make "common" code for something
that is not really "common".

But maybe I am misinterpreting the intentions :)

J.

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

Posted by Ping Zhang <pi...@umich.edu>.
Hi Andrew and Jarek,

Thanks for the comment.

Airflow is currently designed to be more resilient with at-least-once task
delivery instead of at-most-once, as it already has singleton execution
guarantee. In fact for all CeleryExecutor users, we are already using
at-most-once delivery given the message redelivery of Celery.

Andrew, for your concern about the "multiple times over the course of a
small time period,", this won't be an issue since there is a throttle
control, only re-sending queued tasks to the executor if it is queued for a
period of time (configurable). I may miss some context. Why does the
executor need to treat task requests from the reschedule/deferred
differently?

To Jarek's point, I agree that we can also have a method
('_clear_stuck_queued_tasks', which can be renamed to something else with
more  formalized the executor interface)  in the BaseExecutor to simply
handle the case and be invoked by the scheduler.

So the revised plan will be:

1. have a method defined in the BaseExecutor to clear stuck queued tasks
for a period of time\
    1.1. rewind the queued state in the ti to scheduled state
    1.2. clear some in memory states (for example, self.running) in the
executor.
2. scheduler invokes that method periodically.


Please let me know your thoughts.

Thanks,

Ping


On Wed, Mar 16, 2022 at 9:35 AM Andrew Godwin
<an...@astronomer.io.invalid> wrote:

> Having thought on this further, I actually think resubmitting into the
> executor would not be feasible, as it's not possible to tell duplicate task
> requests from resuming after reschedule/deferred etc.
>
> In these situations, the scheduler will submit the same task instance -
> with the same try number and all other details the same - multiple times
> over the course of a small time period, which feels like it is going to be
> impossible to tell apart from the same task accidentally coming in twice
> without a lot of extra accounting on the executor side.
>
> As for adding more to the interface - I would love to see Airflow
> formalise the executor interface more rather than start exposing
> private/underscore-prefixed variables. I think adding a loop that solves
> this to Airflow executors that need it is probably a cleaner way of
> handling it - the contract then being "if it's in QUEUED, then it is the
> executor's job to do something with it"
>
> Andrew
>
>
> On Tue, Mar 15, 2022, 16:11 Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> Should not we make "_clear_stuck_queued_tasks" part of the executor
>> interface simply?  And trigger it by the scheduler?  I guess that
>> might solve it in a "portable" way without changing the "at-most-once"
>> semantics?
>>
>> J.
>>
>> On Tue, Mar 15, 2022 at 10:40 PM Andrew Godwin
>> <an...@astronomer.io.invalid> wrote:
>> >
>> > I agree this needs a core fix in Airflow, but I'd like to highlight
>> that this is fundamentally changing the executor contract (as it changes
>> tasks from at-most-once submission to at-least-once) and so not only would
>> it need a very close level of testing, it would also be some level of
>> breaking change - since Airflow allows you to plug in third party executors.
>> >
>> > Not that I'm against it, but we'd have to have a bit of a debate about
>> what level of semver impact it would have. I imagine we could just about
>> justify it as a minor-level change?
>> >
>> > Andrew
>> >
>> > On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <pi...@umich.edu> wrote:
>> >>
>> >> Hi all,
>> >>
>> >> Currently, the tasks could be stuck at `queued` state and could not be
>> scheduled by the scheduler or picked up by the worker. This could happen
>> when failure happens after a task is marked by `queued` before the executor
>> marks it as `running`.
>> >>
>> >> There is a fix https://github.com/apache/airflow/pull/19769/files for
>> the celery executor. However, it only targets the CeleryExecutor and it
>> leaks the scheduler responsibility to the executor.
>> >>
>> >> We propose to move the state reconciliation logic to the scheduler.
>> The proposed change:
>> >>
>> >> 1. when getting the _executable_task_instances_to_queued (
>> https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287),
>> it also includes QUEUED state, like:
>>  .filter(TI.state.in_([TaskInstanceState.SCHEDULED,
>> TaskInstanceState.QUEUED]). In this way, the scheduler can process queued
>> tis.
>> >>
>> >> 2. In the queue_command method in the base executor, it conditionally
>> (based on last enqueued time by the executor, this is configurable) queues
>> the ti even if its state is QUEUED.
>> >>
>> >> This could potentially send the same tasks twice to the executor.
>> Since in the worker side, there are condition checks about whether a task
>> can run or not. This won't cause issues.
>> >>
>> >> We have been running this in our prod for a while and would love to
>> contribute it.
>> >>
>> >> Please let me know your thoughts.
>> >>
>> >> Thanks,
>> >>
>> >> Ping
>>
>

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

Posted by Andrew Godwin <an...@astronomer.io.INVALID>.
Having thought on this further, I actually think resubmitting into the
executor would not be feasible, as it's not possible to tell duplicate task
requests from resuming after reschedule/deferred etc.

In these situations, the scheduler will submit the same task instance -
with the same try number and all other details the same - multiple times
over the course of a small time period, which feels like it is going to be
impossible to tell apart from the same task accidentally coming in twice
without a lot of extra accounting on the executor side.

As for adding more to the interface - I would love to see Airflow formalise
the executor interface more rather than start exposing
private/underscore-prefixed variables. I think adding a loop that solves
this to Airflow executors that need it is probably a cleaner way of
handling it - the contract then being "if it's in QUEUED, then it is the
executor's job to do something with it"

Andrew


On Tue, Mar 15, 2022, 16:11 Jarek Potiuk <ja...@potiuk.com> wrote:

> Should not we make "_clear_stuck_queued_tasks" part of the executor
> interface simply?  And trigger it by the scheduler?  I guess that
> might solve it in a "portable" way without changing the "at-most-once"
> semantics?
>
> J.
>
> On Tue, Mar 15, 2022 at 10:40 PM Andrew Godwin
> <an...@astronomer.io.invalid> wrote:
> >
> > I agree this needs a core fix in Airflow, but I'd like to highlight that
> this is fundamentally changing the executor contract (as it changes tasks
> from at-most-once submission to at-least-once) and so not only would it
> need a very close level of testing, it would also be some level of breaking
> change - since Airflow allows you to plug in third party executors.
> >
> > Not that I'm against it, but we'd have to have a bit of a debate about
> what level of semver impact it would have. I imagine we could just about
> justify it as a minor-level change?
> >
> > Andrew
> >
> > On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <pi...@umich.edu> wrote:
> >>
> >> Hi all,
> >>
> >> Currently, the tasks could be stuck at `queued` state and could not be
> scheduled by the scheduler or picked up by the worker. This could happen
> when failure happens after a task is marked by `queued` before the executor
> marks it as `running`.
> >>
> >> There is a fix https://github.com/apache/airflow/pull/19769/files for
> the celery executor. However, it only targets the CeleryExecutor and it
> leaks the scheduler responsibility to the executor.
> >>
> >> We propose to move the state reconciliation logic to the scheduler. The
> proposed change:
> >>
> >> 1. when getting the _executable_task_instances_to_queued (
> https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287),
> it also includes QUEUED state, like:
>  .filter(TI.state.in_([TaskInstanceState.SCHEDULED,
> TaskInstanceState.QUEUED]). In this way, the scheduler can process queued
> tis.
> >>
> >> 2. In the queue_command method in the base executor, it conditionally
> (based on last enqueued time by the executor, this is configurable) queues
> the ti even if its state is QUEUED.
> >>
> >> This could potentially send the same tasks twice to the executor. Since
> in the worker side, there are condition checks about whether a task can run
> or not. This won't cause issues.
> >>
> >> We have been running this in our prod for a while and would love to
> contribute it.
> >>
> >> Please let me know your thoughts.
> >>
> >> Thanks,
> >>
> >> Ping
>

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

Posted by Jarek Potiuk <ja...@potiuk.com>.
Should not we make "_clear_stuck_queued_tasks" part of the executor
interface simply?  And trigger it by the scheduler?  I guess that
might solve it in a "portable" way without changing the "at-most-once"
semantics?

J.

On Tue, Mar 15, 2022 at 10:40 PM Andrew Godwin
<an...@astronomer.io.invalid> wrote:
>
> I agree this needs a core fix in Airflow, but I'd like to highlight that this is fundamentally changing the executor contract (as it changes tasks from at-most-once submission to at-least-once) and so not only would it need a very close level of testing, it would also be some level of breaking change - since Airflow allows you to plug in third party executors.
>
> Not that I'm against it, but we'd have to have a bit of a debate about what level of semver impact it would have. I imagine we could just about justify it as a minor-level change?
>
> Andrew
>
> On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <pi...@umich.edu> wrote:
>>
>> Hi all,
>>
>> Currently, the tasks could be stuck at `queued` state and could not be scheduled by the scheduler or picked up by the worker. This could happen when failure happens after a task is marked by `queued` before the executor marks it as `running`.
>>
>> There is a fix https://github.com/apache/airflow/pull/19769/files for the celery executor. However, it only targets the CeleryExecutor and it leaks the scheduler responsibility to the executor.
>>
>> We propose to move the state reconciliation logic to the scheduler. The proposed change:
>>
>> 1. when getting the _executable_task_instances_to_queued (https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287), it also includes QUEUED state, like:   .filter(TI.state.in_([TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED]). In this way, the scheduler can process queued tis.
>>
>> 2. In the queue_command method in the base executor, it conditionally (based on last enqueued time by the executor, this is configurable) queues the ti even if its state is QUEUED.
>>
>> This could potentially send the same tasks twice to the executor. Since in the worker side, there are condition checks about whether a task can run or not. This won't cause issues.
>>
>> We have been running this in our prod for a while and would love to contribute it.
>>
>> Please let me know your thoughts.
>>
>> Thanks,
>>
>> Ping

Re: [DISCUSSION] let scheduler heal tasks stuck in queued state

Posted by Andrew Godwin <an...@astronomer.io.INVALID>.
I agree this needs a core fix in Airflow, but I'd like to highlight that
this is fundamentally changing the executor contract (as it changes tasks
from at-most-once submission to at-least-once) and so not only would it
need a very close level of testing, it would also be some level of breaking
change - since Airflow allows you to plug in third party executors.

Not that I'm against it, but we'd have to have a bit of a debate about what
level of semver impact it would have. I imagine we could just about justify
it as a minor-level change?

Andrew

On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <pi...@umich.edu> wrote:

> Hi all,
>
> Currently, the tasks could be stuck at `queued` state and could not be
> scheduled by the scheduler or picked up by the worker. This could happen
> when failure happens after a task is marked by `queued` before the executor
> marks it as `running`.
>
> There is a fix https://github.com/apache/airflow/pull/19769/files for the
> celery executor. However, it only targets the CeleryExecutor and it leaks
> the scheduler responsibility to the executor.
>
> We propose to move the state reconciliation logic to the scheduler. The
> proposed change:
>
> 1. when getting the _executable_task_instances_to_queued (
> https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287),
> it also includes QUEUED state, like:   .filter(TI.state.in_([
> TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED]). In this way, the
> scheduler can process queued tis.
>
> 2. In the queue_command method in the base executor, it conditionally
> (based on last enqueued time by the executor, this is configurable) queues
> the ti even if its state is QUEUED.
>
> This could potentially send the same tasks twice to the executor. Since in
> the worker side, there are condition checks about whether a task can run or
> not. This won't cause issues.
>
> We have been running this in our prod for a while and would love to
> contribute it.
>
> Please let me know your thoughts.
>
> Thanks,
>
> Ping
>