You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Boris Tyukin (JIRA)" <ji...@apache.org> on 2017/03/18 20:18:41 UTC
[jira] [Commented] (AIRFLOW-1008) option to prevent a dag from
running concurrently
[ https://issues.apache.org/jira/browse/AIRFLOW-1008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931369#comment-15931369 ]
Boris Tyukin commented on AIRFLOW-1008:
---------------------------------------
a workaround from F. Hakan Koklu:
------------------------
I had a similar problem and I solved it by querying the dagrun table to
make sure a dag is not running and kick it off with the TriggerDagRun. It
has been running for months now without any issues so I highly recommend it.
I added a utility function and I call that before I do the trigger. Here it
is:
def is_dag_running(dag_id):
"""Given the dag_id it returns if it is currently running or not"""
with session_scope() as session:
result = session.query(af_models.DagRun)\
.filter(af_models.DagRun.dag_id == dag_id,
af_models.DagRun.state == 'running')\
.count()
return bool(result)
session_scope is a context manager which simply yields
airflow.settings.session and cleans up. Hope this helps.
> option to prevent a dag from running concurrently
> -------------------------------------------------
>
> Key: AIRFLOW-1008
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1008
> Project: Apache Airflow
> Issue Type: Improvement
> Affects Versions: Airflow 1.8
> Reporter: Boris Tyukin
> Priority: Minor
>
> Would be nice to have an option to prevent a DAG from scheduling / running concurrently whether it is kicked by a scheduler or triggered externally (via airflow cli or UI).
> max_active_runs only respects scheduled jobs and not the jobs ran from cli or triggered by another task.
> I see this is done intentionally in dagrun_exists_dep.py:
> running_dagruns = DagRun.find(
> dag_id=dag.dag_id,
> state=State.RUNNING,
> external_trigger=False,
> session=session
> )
> I've looked at depends_on_past=True but it does not really help in this case.
> I am posting below some comments from Max as well:
> Without looking at the latest code to confirm what I'm about to write,
> `max_active_runs` really only prevents the scheduler from creating new
> active DAG runs. For `max_active_runs` to apply to externally triggered
> runs, we'd need to introduce handling of a new status of `scheduled` to DAG
> runs. The scheduler would have to handle the new simple task of flipping
> the status from this `scheduled` to `running` when `actual_active_dag_runs
> < max_active_runs`. We'd probably want for the CLI command and the UI DAG
> run creation process to default DAG run status to this new `scheduled`
> state.
> I think it should be a fairly simple feature to add in.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)