You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Dan Davydov <dd...@twitter.com.INVALID> on 2018/08/22 18:31:33 UTC

Why not mark inactive DAGs in the main scheduler loop?

I see some PRs creating endpoints to delete DAGs and other things related
to manually deleting DAGs from the DB, but is there a good reason why we
can't just move the deactivating DAG logic into the main scheduler loop?

The scheduler already has some code like this, but it only runs when the
Scheduler terminates:
      if all_files_processed:
            self.log.info(
                "Deactivating DAGs that haven't been touched since %s",
                execute_start_time.isoformat()
            )
            models.DAG.deactivate_stale_dags(execute_start_time)

Re: Why not mark inactive DAGs in the main scheduler loop?

Posted by Ruiqin Yang <yr...@gmail.com>.
I previously sent a proposal about scaling Airflow, I created Jira tickets
around that time. For this particular one, it is AIRFLOW-2760
<https://issues.apache.org/jira/browse/AIRFLOW-2760>. We've finished
testing it in Airbnb and plan to bake it for some time while I work on open
source the change.

Cheers,
Kevin Y

On Wed, Aug 22, 2018 at 6:45 PM Taylor Edmiston <te...@gmail.com> wrote:

> Kevin - Is there a Jira issue one can follow for this?
>
> On Wed, Aug 22, 2018 at 5:29 PM Ruiqin Yang <yr...@gmail.com> wrote:
>
> > I'm working on spliting the DAG parsing manager to a subprocess and with
> > that we don't need to worry about scheduler doing non-supervisor stuff
> nor
> > prolong scheduler loop duration. I can make a follow up PR to address
> this
> > once I have the original PR published if you guys don't have plan to work
> > on it in the very near future.
> >
> > Cheers,
> > Kevin Y
> >
> > On Wed, Aug 22, 2018 at 1:00 PM Dan Davydov <ddavydov@twitter.com.invalid
> >
> > wrote:
> >
> > > Agreed on delegation to a subprocess but I think that can come as part
> > of a
> > > larger redesign (maybe along with uploading DAG import errors etc). The
> > > query should be quite fast so it should not have a significant impact
> on
> > > the Scheduler times.
> > >
> > > On Wed, Aug 22, 2018 at 3:52 PM Maxime Beauchemin <
> > > maximebeauchemin@gmail.com> wrote:
> > >
> > > > I'd rather the scheduler delegate that to one of the minions
> > (subprocess)
> > > > if possible. We should keep everything we can off the main thread.
> > > >
> > > > BTW I've been speaking about renaming the scheduler to "supervisor"
> > for a
> > > > while now. While renaming may be a bit tricky (updating all
> references
> > in
> > > > the code), we should think of the scheduler as more of a supervisor
> as
> > it
> > > > takes on all sorts of supervision-related tasks.
> > > >
> > > > Tangent: we need to start thinking about allowing for a distributed
> > > > scheduler too, and I'm thinking we need to be careful around the
> tasks
> > > that
> > > > shouldn't be parallelized (this may or may not be one of them).
> We'll
> > > need
> > > > to do very basic leader election and taking/releasing locks while
> > running
> > > > these tasks. I'm thinking we can just set flags in the database to do
> > > that.
> > > >
> > > > Max
> > > >
> > > > On Wed, Aug 22, 2018 at 12:19 PM Taylor Edmiston <
> tedmiston@gmail.com>
> > > > wrote:
> > > >
> > > > > I'm not super familiar with this part of the scheduler.  What
> exactly
> > > are
> > > > > the implications of doing this mid-loop vs at scheduler
> termination?
> > > > > Is there a use case where DAGs hit this besides having been
> deleted?
> > > > >
> > > > > The deactivate_stale_dags call doesn't appear to be super expensive
> > or
> > > > > anything like that.
> > > > >
> > > > > This seems like a reasonable idea to me.
> > > > >
> > > > > *Taylor Edmiston*
> > > > > Blog <https://blog.tedmiston.com/> | CV
> > > > > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > > > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > > <https://angel.co/taylor> | Stack Overflow
> > > > > <https://stackoverflow.com/users/149428/taylor-edmiston>
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 22, 2018 at 2:32 PM Dan Davydov
> > > <ddavydov@twitter.com.invalid
> > > > >
> > > > > wrote:
> > > > >
> > > > > > I see some PRs creating endpoints to delete DAGs and other things
> > > > related
> > > > > > to manually deleting DAGs from the DB, but is there a good reason
> > why
> > > > we
> > > > > > can't just move the deactivating DAG logic into the main
> scheduler
> > > > loop?
> > > > > >
> > > > > > The scheduler already has some code like this, but it only runs
> > when
> > > > the
> > > > > > Scheduler terminates:
> > > > > >       if all_files_processed:
> > > > > >             self.log.info(
> > > > > >                 "Deactivating DAGs that haven't been touched
> since
> > > %s",
> > > > > >                 execute_start_time.isoformat()
> > > > > >             )
> > > > > >             models.DAG.deactivate_stale_dags(execute_start_time)
> > > > > >
> > > > >
> > > >
> > >
> >
> --
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>

Re: Why not mark inactive DAGs in the main scheduler loop?

Posted by Taylor Edmiston <te...@gmail.com>.
Kevin - Is there a Jira issue one can follow for this?

On Wed, Aug 22, 2018 at 5:29 PM Ruiqin Yang <yr...@gmail.com> wrote:

> I'm working on spliting the DAG parsing manager to a subprocess and with
> that we don't need to worry about scheduler doing non-supervisor stuff nor
> prolong scheduler loop duration. I can make a follow up PR to address this
> once I have the original PR published if you guys don't have plan to work
> on it in the very near future.
>
> Cheers,
> Kevin Y
>
> On Wed, Aug 22, 2018 at 1:00 PM Dan Davydov <dd...@twitter.com.invalid>
> wrote:
>
> > Agreed on delegation to a subprocess but I think that can come as part
> of a
> > larger redesign (maybe along with uploading DAG import errors etc). The
> > query should be quite fast so it should not have a significant impact on
> > the Scheduler times.
> >
> > On Wed, Aug 22, 2018 at 3:52 PM Maxime Beauchemin <
> > maximebeauchemin@gmail.com> wrote:
> >
> > > I'd rather the scheduler delegate that to one of the minions
> (subprocess)
> > > if possible. We should keep everything we can off the main thread.
> > >
> > > BTW I've been speaking about renaming the scheduler to "supervisor"
> for a
> > > while now. While renaming may be a bit tricky (updating all references
> in
> > > the code), we should think of the scheduler as more of a supervisor as
> it
> > > takes on all sorts of supervision-related tasks.
> > >
> > > Tangent: we need to start thinking about allowing for a distributed
> > > scheduler too, and I'm thinking we need to be careful around the tasks
> > that
> > > shouldn't be parallelized (this may or may not be one of them).  We'll
> > need
> > > to do very basic leader election and taking/releasing locks while
> running
> > > these tasks. I'm thinking we can just set flags in the database to do
> > that.
> > >
> > > Max
> > >
> > > On Wed, Aug 22, 2018 at 12:19 PM Taylor Edmiston <te...@gmail.com>
> > > wrote:
> > >
> > > > I'm not super familiar with this part of the scheduler.  What exactly
> > are
> > > > the implications of doing this mid-loop vs at scheduler termination?
> > > > Is there a use case where DAGs hit this besides having been deleted?
> > > >
> > > > The deactivate_stale_dags call doesn't appear to be super expensive
> or
> > > > anything like that.
> > > >
> > > > This seems like a reasonable idea to me.
> > > >
> > > > *Taylor Edmiston*
> > > > Blog <https://blog.tedmiston.com/> | CV
> > > > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > <https://angel.co/taylor> | Stack Overflow
> > > > <https://stackoverflow.com/users/149428/taylor-edmiston>
> > > >
> > > >
> > > >
> > > > On Wed, Aug 22, 2018 at 2:32 PM Dan Davydov
> > <ddavydov@twitter.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > I see some PRs creating endpoints to delete DAGs and other things
> > > related
> > > > > to manually deleting DAGs from the DB, but is there a good reason
> why
> > > we
> > > > > can't just move the deactivating DAG logic into the main scheduler
> > > loop?
> > > > >
> > > > > The scheduler already has some code like this, but it only runs
> when
> > > the
> > > > > Scheduler terminates:
> > > > >       if all_files_processed:
> > > > >             self.log.info(
> > > > >                 "Deactivating DAGs that haven't been touched since
> > %s",
> > > > >                 execute_start_time.isoformat()
> > > > >             )
> > > > >             models.DAG.deactivate_stale_dags(execute_start_time)
> > > > >
> > > >
> > >
> >
>
-- 
*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>

Re: Why not mark inactive DAGs in the main scheduler loop?

Posted by Ruiqin Yang <yr...@gmail.com>.
I'm working on spliting the DAG parsing manager to a subprocess and with
that we don't need to worry about scheduler doing non-supervisor stuff nor
prolong scheduler loop duration. I can make a follow up PR to address this
once I have the original PR published if you guys don't have plan to work
on it in the very near future.

Cheers,
Kevin Y

On Wed, Aug 22, 2018 at 1:00 PM Dan Davydov <dd...@twitter.com.invalid>
wrote:

> Agreed on delegation to a subprocess but I think that can come as part of a
> larger redesign (maybe along with uploading DAG import errors etc). The
> query should be quite fast so it should not have a significant impact on
> the Scheduler times.
>
> On Wed, Aug 22, 2018 at 3:52 PM Maxime Beauchemin <
> maximebeauchemin@gmail.com> wrote:
>
> > I'd rather the scheduler delegate that to one of the minions (subprocess)
> > if possible. We should keep everything we can off the main thread.
> >
> > BTW I've been speaking about renaming the scheduler to "supervisor" for a
> > while now. While renaming may be a bit tricky (updating all references in
> > the code), we should think of the scheduler as more of a supervisor as it
> > takes on all sorts of supervision-related tasks.
> >
> > Tangent: we need to start thinking about allowing for a distributed
> > scheduler too, and I'm thinking we need to be careful around the tasks
> that
> > shouldn't be parallelized (this may or may not be one of them).  We'll
> need
> > to do very basic leader election and taking/releasing locks while running
> > these tasks. I'm thinking we can just set flags in the database to do
> that.
> >
> > Max
> >
> > On Wed, Aug 22, 2018 at 12:19 PM Taylor Edmiston <te...@gmail.com>
> > wrote:
> >
> > > I'm not super familiar with this part of the scheduler.  What exactly
> are
> > > the implications of doing this mid-loop vs at scheduler termination?
> > > Is there a use case where DAGs hit this besides having been deleted?
> > >
> > > The deactivate_stale_dags call doesn't appear to be super expensive or
> > > anything like that.
> > >
> > > This seems like a reasonable idea to me.
> > >
> > > *Taylor Edmiston*
> > > Blog <https://blog.tedmiston.com/> | CV
> > > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > <https://angel.co/taylor> | Stack Overflow
> > > <https://stackoverflow.com/users/149428/taylor-edmiston>
> > >
> > >
> > >
> > > On Wed, Aug 22, 2018 at 2:32 PM Dan Davydov
> <ddavydov@twitter.com.invalid
> > >
> > > wrote:
> > >
> > > > I see some PRs creating endpoints to delete DAGs and other things
> > related
> > > > to manually deleting DAGs from the DB, but is there a good reason why
> > we
> > > > can't just move the deactivating DAG logic into the main scheduler
> > loop?
> > > >
> > > > The scheduler already has some code like this, but it only runs when
> > the
> > > > Scheduler terminates:
> > > >       if all_files_processed:
> > > >             self.log.info(
> > > >                 "Deactivating DAGs that haven't been touched since
> %s",
> > > >                 execute_start_time.isoformat()
> > > >             )
> > > >             models.DAG.deactivate_stale_dags(execute_start_time)
> > > >
> > >
> >
>

Re: Why not mark inactive DAGs in the main scheduler loop?

Posted by Dan Davydov <dd...@twitter.com.INVALID>.
Agreed on delegation to a subprocess but I think that can come as part of a
larger redesign (maybe along with uploading DAG import errors etc). The
query should be quite fast so it should not have a significant impact on
the Scheduler times.

On Wed, Aug 22, 2018 at 3:52 PM Maxime Beauchemin <
maximebeauchemin@gmail.com> wrote:

> I'd rather the scheduler delegate that to one of the minions (subprocess)
> if possible. We should keep everything we can off the main thread.
>
> BTW I've been speaking about renaming the scheduler to "supervisor" for a
> while now. While renaming may be a bit tricky (updating all references in
> the code), we should think of the scheduler as more of a supervisor as it
> takes on all sorts of supervision-related tasks.
>
> Tangent: we need to start thinking about allowing for a distributed
> scheduler too, and I'm thinking we need to be careful around the tasks that
> shouldn't be parallelized (this may or may not be one of them).  We'll need
> to do very basic leader election and taking/releasing locks while running
> these tasks. I'm thinking we can just set flags in the database to do that.
>
> Max
>
> On Wed, Aug 22, 2018 at 12:19 PM Taylor Edmiston <te...@gmail.com>
> wrote:
>
> > I'm not super familiar with this part of the scheduler.  What exactly are
> > the implications of doing this mid-loop vs at scheduler termination?
> > Is there a use case where DAGs hit this besides having been deleted?
> >
> > The deactivate_stale_dags call doesn't appear to be super expensive or
> > anything like that.
> >
> > This seems like a reasonable idea to me.
> >
> > *Taylor Edmiston*
> > Blog <https://blog.tedmiston.com/> | CV
> > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor> | Stack Overflow
> > <https://stackoverflow.com/users/149428/taylor-edmiston>
> >
> >
> >
> > On Wed, Aug 22, 2018 at 2:32 PM Dan Davydov <ddavydov@twitter.com.invalid
> >
> > wrote:
> >
> > > I see some PRs creating endpoints to delete DAGs and other things
> related
> > > to manually deleting DAGs from the DB, but is there a good reason why
> we
> > > can't just move the deactivating DAG logic into the main scheduler
> loop?
> > >
> > > The scheduler already has some code like this, but it only runs when
> the
> > > Scheduler terminates:
> > >       if all_files_processed:
> > >             self.log.info(
> > >                 "Deactivating DAGs that haven't been touched since %s",
> > >                 execute_start_time.isoformat()
> > >             )
> > >             models.DAG.deactivate_stale_dags(execute_start_time)
> > >
> >
>

Re: Why not mark inactive DAGs in the main scheduler loop?

Posted by Maxime Beauchemin <ma...@gmail.com>.
I'd rather the scheduler delegate that to one of the minions (subprocess)
if possible. We should keep everything we can off the main thread.

BTW I've been speaking about renaming the scheduler to "supervisor" for a
while now. While renaming may be a bit tricky (updating all references in
the code), we should think of the scheduler as more of a supervisor as it
takes on all sorts of supervision-related tasks.

Tangent: we need to start thinking about allowing for a distributed
scheduler too, and I'm thinking we need to be careful around the tasks that
shouldn't be parallelized (this may or may not be one of them).  We'll need
to do very basic leader election and taking/releasing locks while running
these tasks. I'm thinking we can just set flags in the database to do that.

Max

On Wed, Aug 22, 2018 at 12:19 PM Taylor Edmiston <te...@gmail.com>
wrote:

> I'm not super familiar with this part of the scheduler.  What exactly are
> the implications of doing this mid-loop vs at scheduler termination?
> Is there a use case where DAGs hit this besides having been deleted?
>
> The deactivate_stale_dags call doesn't appear to be super expensive or
> anything like that.
>
> This seems like a reasonable idea to me.
>
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | CV
> <https://stackoverflow.com/cv/taylor> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | AngelList
> <https://angel.co/taylor> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston>
>
>
>
> On Wed, Aug 22, 2018 at 2:32 PM Dan Davydov <dd...@twitter.com.invalid>
> wrote:
>
> > I see some PRs creating endpoints to delete DAGs and other things related
> > to manually deleting DAGs from the DB, but is there a good reason why we
> > can't just move the deactivating DAG logic into the main scheduler loop?
> >
> > The scheduler already has some code like this, but it only runs when the
> > Scheduler terminates:
> >       if all_files_processed:
> >             self.log.info(
> >                 "Deactivating DAGs that haven't been touched since %s",
> >                 execute_start_time.isoformat()
> >             )
> >             models.DAG.deactivate_stale_dags(execute_start_time)
> >
>

Re: Why not mark inactive DAGs in the main scheduler loop?

Posted by Taylor Edmiston <te...@gmail.com>.
I'm not super familiar with this part of the scheduler.  What exactly are
the implications of doing this mid-loop vs at scheduler termination?
Is there a use case where DAGs hit this besides having been deleted?

The deactivate_stale_dags call doesn't appear to be super expensive or
anything like that.

This seems like a reasonable idea to me.

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>



On Wed, Aug 22, 2018 at 2:32 PM Dan Davydov <dd...@twitter.com.invalid>
wrote:

> I see some PRs creating endpoints to delete DAGs and other things related
> to manually deleting DAGs from the DB, but is there a good reason why we
> can't just move the deactivating DAG logic into the main scheduler loop?
>
> The scheduler already has some code like this, but it only runs when the
> Scheduler terminates:
>       if all_files_processed:
>             self.log.info(
>                 "Deactivating DAGs that haven't been touched since %s",
>                 execute_start_time.isoformat()
>             )
>             models.DAG.deactivate_stale_dags(execute_start_time)
>