You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Boris Tyukin <bo...@boristyukin.com> on 2017/01/17 15:18:57 UTC

question about max_active_runs or how to force only one Dag run at the time

hello,

I would like to prevent a DAG from scheduling / running concurrently
whether by scheduler or if triggered externally (via airflow trigger_dag or
UI).

I tried to set max_active_runs to 1 but i did not work for externally
triggered case - airflow would still go ahead and execute multiple dag runs.

I see this is done intentionally in dagrun_exists_dep.py:

running_dagruns = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
*external_trigger=False,*
session=session
)
Is there any other way of forcing only one DAG run at the time?

I am aware of depends_on_past=True but I use PythonBranch operators and
that setting does not make sense in my case.

Re: question about max_active_runs or how to force only one Dag run at the time

Posted by Maxime Beauchemin <ma...@gmail.com>.
Without looking at the latest code to confirm what I'm about to write,
`max_active_runs` really only prevents the scheduler from creating new
active DAG runs. For `max_active_runs` to apply to externally triggered
runs, we'd need to introduce handling of a new status of `scheduled` to DAG
runs. The scheduler would have to handle the new simple task of flipping
the status from this `scheduled` to `running` when `actual_active_dag_runs
< max_active_runs`. We'd probably want for the CLI command and the UI DAG
run creation process to default DAG run status to this new `scheduled`
state.

I think it should be a fairly simple feature to add in.

Max

On Tue, Jan 17, 2017 at 8:29 AM, Bolke de Bruin <bd...@gmail.com> wrote:

> Well yes I guess so. You can actually create pools from a DAG definition
> as well (I personally don’t like that for operational security reasons),
> but yes this works. So could verify if the pool exists for a certain DAG
> and create it if it doesn’t.
>
> Bolke
>
> > On 17 Jan 2017, at 17:26, Boris Tyukin <bo...@boristyukin.com> wrote:
> >
> > interesting but I have 200 DAGs like that - I generate them
> > programmatically. Does it mean I need a different pool for each one of
> > them? Can I define pools not from UI? that would be painful :)
> >
> > On Tue, Jan 17, 2017 at 10:22 AM, Bolke de Bruin <bd...@gmail.com>
> wrote:
> >
> >> You could use a pool, if you are fine with the “second” dag run being
> run
> >> later.
> >>
> >>
> >>> On 17 Jan 2017, at 16:18, Boris Tyukin <bo...@boristyukin.com> wrote:
> >>>
> >>> hello,
> >>>
> >>> I would like to prevent a DAG from scheduling / running concurrently
> >>> whether by scheduler or if triggered externally (via airflow
> trigger_dag
> >> or
> >>> UI).
> >>>
> >>> I tried to set max_active_runs to 1 but i did not work for externally
> >>> triggered case - airflow would still go ahead and execute multiple dag
> >> runs.
> >>>
> >>> I see this is done intentionally in dagrun_exists_dep.py:
> >>>
> >>> running_dagruns = DagRun.find(
> >>> dag_id=dag.dag_id,
> >>> state=State.RUNNING,
> >>> *external_trigger=False,*
> >>> session=session
> >>> )
> >>> Is there any other way of forcing only one DAG run at the time?
> >>>
> >>> I am aware of depends_on_past=True but I use PythonBranch operators and
> >>> that setting does not make sense in my case.
> >>
> >>
>
>

Re: question about max_active_runs or how to force only one Dag run at the time

Posted by Boris Tyukin <bo...@boristyukin.com>.
interesting, thanks Bolke for giving me this idea!

On Tue, Jan 17, 2017 at 11:29 AM, Bolke de Bruin <bd...@gmail.com> wrote:

> Well yes I guess so. You can actually create pools from a DAG definition
> as well (I personally don’t like that for operational security reasons),
> but yes this works. So could verify if the pool exists for a certain DAG
> and create it if it doesn’t.
>
> Bolke
>
> > On 17 Jan 2017, at 17:26, Boris Tyukin <bo...@boristyukin.com> wrote:
> >
> > interesting but I have 200 DAGs like that - I generate them
> > programmatically. Does it mean I need a different pool for each one of
> > them? Can I define pools not from UI? that would be painful :)
> >
> > On Tue, Jan 17, 2017 at 10:22 AM, Bolke de Bruin <bd...@gmail.com>
> wrote:
> >
> >> You could use a pool, if you are fine with the “second” dag run being
> run
> >> later.
> >>
> >>
> >>> On 17 Jan 2017, at 16:18, Boris Tyukin <bo...@boristyukin.com> wrote:
> >>>
> >>> hello,
> >>>
> >>> I would like to prevent a DAG from scheduling / running concurrently
> >>> whether by scheduler or if triggered externally (via airflow
> trigger_dag
> >> or
> >>> UI).
> >>>
> >>> I tried to set max_active_runs to 1 but i did not work for externally
> >>> triggered case - airflow would still go ahead and execute multiple dag
> >> runs.
> >>>
> >>> I see this is done intentionally in dagrun_exists_dep.py:
> >>>
> >>> running_dagruns = DagRun.find(
> >>> dag_id=dag.dag_id,
> >>> state=State.RUNNING,
> >>> *external_trigger=False,*
> >>> session=session
> >>> )
> >>> Is there any other way of forcing only one DAG run at the time?
> >>>
> >>> I am aware of depends_on_past=True but I use PythonBranch operators and
> >>> that setting does not make sense in my case.
> >>
> >>
>
>

Re: question about max_active_runs or how to force only one Dag run at the time

Posted by Bolke de Bruin <bd...@gmail.com>.
Well yes I guess so. You can actually create pools from a DAG definition as well (I personally don’t like that for operational security reasons), but yes this works. So could verify if the pool exists for a certain DAG and create it if it doesn’t.

Bolke

> On 17 Jan 2017, at 17:26, Boris Tyukin <bo...@boristyukin.com> wrote:
> 
> interesting but I have 200 DAGs like that - I generate them
> programmatically. Does it mean I need a different pool for each one of
> them? Can I define pools not from UI? that would be painful :)
> 
> On Tue, Jan 17, 2017 at 10:22 AM, Bolke de Bruin <bd...@gmail.com> wrote:
> 
>> You could use a pool, if you are fine with the “second” dag run being run
>> later.
>> 
>> 
>>> On 17 Jan 2017, at 16:18, Boris Tyukin <bo...@boristyukin.com> wrote:
>>> 
>>> hello,
>>> 
>>> I would like to prevent a DAG from scheduling / running concurrently
>>> whether by scheduler or if triggered externally (via airflow trigger_dag
>> or
>>> UI).
>>> 
>>> I tried to set max_active_runs to 1 but i did not work for externally
>>> triggered case - airflow would still go ahead and execute multiple dag
>> runs.
>>> 
>>> I see this is done intentionally in dagrun_exists_dep.py:
>>> 
>>> running_dagruns = DagRun.find(
>>> dag_id=dag.dag_id,
>>> state=State.RUNNING,
>>> *external_trigger=False,*
>>> session=session
>>> )
>>> Is there any other way of forcing only one DAG run at the time?
>>> 
>>> I am aware of depends_on_past=True but I use PythonBranch operators and
>>> that setting does not make sense in my case.
>> 
>> 


Re: question about max_active_runs or how to force only one Dag run at the time

Posted by Boris Tyukin <bo...@boristyukin.com>.
interesting but I have 200 DAGs like that - I generate them
programmatically. Does it mean I need a different pool for each one of
them? Can I define pools not from UI? that would be painful :)

On Tue, Jan 17, 2017 at 10:22 AM, Bolke de Bruin <bd...@gmail.com> wrote:

> You could use a pool, if you are fine with the “second” dag run being run
> later.
>
>
> > On 17 Jan 2017, at 16:18, Boris Tyukin <bo...@boristyukin.com> wrote:
> >
> > hello,
> >
> > I would like to prevent a DAG from scheduling / running concurrently
> > whether by scheduler or if triggered externally (via airflow trigger_dag
> or
> > UI).
> >
> > I tried to set max_active_runs to 1 but i did not work for externally
> > triggered case - airflow would still go ahead and execute multiple dag
> runs.
> >
> > I see this is done intentionally in dagrun_exists_dep.py:
> >
> > running_dagruns = DagRun.find(
> > dag_id=dag.dag_id,
> > state=State.RUNNING,
> > *external_trigger=False,*
> > session=session
> > )
> > Is there any other way of forcing only one DAG run at the time?
> >
> > I am aware of depends_on_past=True but I use PythonBranch operators and
> > that setting does not make sense in my case.
>
>

Re: question about max_active_runs or how to force only one Dag run at the time

Posted by Bolke de Bruin <bd...@gmail.com>.
You could use a pool, if you are fine with the “second” dag run being run later.


> On 17 Jan 2017, at 16:18, Boris Tyukin <bo...@boristyukin.com> wrote:
> 
> hello,
> 
> I would like to prevent a DAG from scheduling / running concurrently
> whether by scheduler or if triggered externally (via airflow trigger_dag or
> UI).
> 
> I tried to set max_active_runs to 1 but i did not work for externally
> triggered case - airflow would still go ahead and execute multiple dag runs.
> 
> I see this is done intentionally in dagrun_exists_dep.py:
> 
> running_dagruns = DagRun.find(
> dag_id=dag.dag_id,
> state=State.RUNNING,
> *external_trigger=False,*
> session=session
> )
> Is there any other way of forcing only one DAG run at the time?
> 
> I am aware of depends_on_past=True but I use PythonBranch operators and
> that setting does not make sense in my case.