You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (JIRA)" <ji...@apache.org> on 2019/06/21 09:37:00 UTC

[jira] [Resolved] (AIRFLOW-608) DAG.max_active_runs should consider runs with active/pending tasks as active

     [ https://issues.apache.org/jira/browse/AIRFLOW-608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ash Berlin-Taylor resolved AIRFLOW-608.
---------------------------------------
    Resolution: Fixed

This has now been fixed (not sure when) as the relevant bit of the {{update_state}} looks like:

{code:python}
        if (not unfinished_tasks and
                any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
{code}

etc.

> DAG.max_active_runs should consider runs with active/pending tasks as active
> ----------------------------------------------------------------------------
>
>                 Key: AIRFLOW-608
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-608
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun, scheduler
>    Affects Versions: 1.7.1
>            Reporter: Casey Ching
>            Priority: Critical
>
> I expected dag runs that have active or pending tasks to be counted towards DAG.max_active_runs but that doesn't happen if the dag run has a failed task.
> The code works as it is described, the issue is that the feature isn't as useful as it could be due to the way it accounts for failed tasks. There should be some way to limit concurrent dag runs, where the definition of running means a dag run has any active or pending tasks.  
> max_active_runs is described as 
> {code}
>     :param max_active_runs: maximum number of active DAG runs, beyond this
>         number of DAG runs in a running state, the scheduler won't create
>         new active DAG runs
> {code}
> https://github.com/apache/incubator-airflow/blob/527e3ecdb8adf9d7a2f40c3ce2c30f2ccaf1924a/airflow/models.py#L2521
> and it's usage is 
> {code}
>         if dag.schedule_interval:
>             active_runs = DagRun.find(
>                 dag_id=dag.dag_id,
>                 state=State.RUNNING,
>                 external_trigger=False,
>                 session=session
>             )
>             # return if already reached maximum active runs and no timeout setting
>             if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
>                 return
> {code}
> https://github.com/apache/incubator-airflow/blob/527e3ecdb8adf9d7a2f40c3ce2c30f2ccaf1924a/airflow/jobs.py#L689
> but the state is not considered RUNNING if any of the tasks have failed though some other tasks could still be running.
> {code}
>     def update_state(self, session=None):
>         """
>         Determines the overall state of the DagRun based on the state
>         of its TaskInstances.
>         :returns State:
>         """
>         dag = self.get_dag()
>         tis = self.get_task_instances(session=session)
>         [...omitted...]
>         # future: remove the check on adhoc tasks (=active_tasks)
>         if len(tis) == len(dag.active_tasks):
>             # if any roots failed, the run failed
>             root_ids = [t.task_id for t in dag.roots]
>             roots = [t for t in tis if t.task_id in root_ids]
>             if any(r.state in (State.FAILED, State.UPSTREAM_FAILED)
>                    for r in roots):
>                 logging.info('Marking run {} failed'.format(self))
>                 self.state = State.FAILED
>             # if all roots succeeded, the run succeeded
>             elif all(r.state in (State.SUCCESS, State.SKIPPED)
>                      for r in roots):
>                 logging.info('Marking run {} successful'.format(self))
>                 self.state = State.SUCCESS
>             # if *all tasks* are deadlocked, the run failed
>             elif unfinished_tasks and none_depends_on_past and no_dependencies_met:
>                 logging.info(
>                     'Deadlock; marking run {} failed'.format(self))
>                 self.state = State.FAILED
>             # finally, if the roots aren't done, the dag is still running
>             else:
>                 self.state = State.RUNNING
> {code}
> https://github.com/apache/incubator-airflow/blob/527e3ecdb8adf9d7a2f40c3ce2c30f2ccaf1924a/airflow/models.py#L3800



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)