You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Boris Tyukin (JIRA)" <ji...@apache.org> on 2017/03/18 20:18:41 UTC

[jira] [Commented] (AIRFLOW-1008) option to prevent a dag from running concurrently

    [ https://issues.apache.org/jira/browse/AIRFLOW-1008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931369#comment-15931369 ] 

Boris Tyukin commented on AIRFLOW-1008:
---------------------------------------

a workaround from F. Hakan Koklu:
------------------------
I had a similar problem and I solved it by querying the dagrun table to
make sure a dag is not running and kick it off with the TriggerDagRun. It
has been running for months now without any issues so I highly recommend it.

I added a utility function and I call that before I do the trigger. Here it
is:


def is_dag_running(dag_id):
    """Given the dag_id it returns if it is currently running or not"""
    with session_scope() as session:
        result = session.query(af_models.DagRun)\
            .filter(af_models.DagRun.dag_id == dag_id,
                    af_models.DagRun.state == 'running')\
            .count()
    return bool(result)

session_scope is a context manager which simply yields
airflow.settings.session and cleans up. Hope this helps.

> option to prevent a dag from running concurrently
> -------------------------------------------------
>
>                 Key: AIRFLOW-1008
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1008
>             Project: Apache Airflow
>          Issue Type: Improvement
>    Affects Versions: Airflow 1.8
>            Reporter: Boris Tyukin
>            Priority: Minor
>
> Would be nice to have an option to prevent a DAG from scheduling / running concurrently whether it is kicked by a scheduler or triggered externally (via airflow cli or UI).
> max_active_runs only respects scheduled jobs and not the jobs ran from cli or triggered by another task.
> I see this is done intentionally in dagrun_exists_dep.py:
>  running_dagruns = DagRun.find(
>                 dag_id=dag.dag_id,
>                 state=State.RUNNING,
>                 external_trigger=False,
>                 session=session
>             )
> I've looked at depends_on_past=True but it does not really help in this case.
> I am posting below some comments from Max as well:
> Without looking at the latest code to confirm what I'm about to write,
> `max_active_runs` really only prevents the scheduler from creating new
> active DAG runs. For `max_active_runs` to apply to externally triggered
> runs, we'd need to introduce handling of a new status of `scheduled` to DAG
> runs. The scheduler would have to handle the new simple task of flipping
> the status from this `scheduled` to `running` when `actual_active_dag_runs
> < max_active_runs`. We'd probably want for the CLI command and the UI DAG
> run creation process to default DAG run status to this new `scheduled`
> state.
> I think it should be a fairly simple feature to add in.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)