You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Casey Ching (JIRA)" <ji...@apache.org> on 2016/10/31 19:40:58 UTC

[jira] [Created] (AIRFLOW-608) DAG.max_active_runs should consider runs with active/pending tasks as active

Casey Ching created AIRFLOW-608:
-----------------------------------

             Summary: DAG.max_active_runs should consider runs with active/pending tasks as active
                 Key: AIRFLOW-608
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-608
             Project: Apache Airflow
          Issue Type: Bug
          Components: DagRun, scheduler
    Affects Versions: Airflow 1.7.1
            Reporter: Casey Ching
            Priority: Critical


I expected dag runs that have active or pending tasks to be counted towards DAG.max_active_runs but that doesn't happen if the dag run has a failed task.

The code works as it is described, the issue is that the feature isn't as useful as it could be due to the way it accounts for failed tasks. There should be some way to limit concurrent dag runs, where the definition of running means a dag run has any active or pending tasks.  

max_active_runs is described as 

{code}
    :param max_active_runs: maximum number of active DAG runs, beyond this
        number of DAG runs in a running state, the scheduler won't create
        new active DAG runs
{code}
https://github.com/apache/incubator-airflow/blob/527e3ecdb8adf9d7a2f40c3ce2c30f2ccaf1924a/airflow/models.py#L2521

and it's usage is 

{code}
        if dag.schedule_interval:
            active_runs = DagRun.find(
                dag_id=dag.dag_id,
                state=State.RUNNING,
                external_trigger=False,
                session=session
            )
            # return if already reached maximum active runs and no timeout setting
            if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
                return
{code}

https://github.com/apache/incubator-airflow/blob/527e3ecdb8adf9d7a2f40c3ce2c30f2ccaf1924a/airflow/jobs.py#L689

but the state is not considered RUNNING if any of the tasks have failed though some other tasks could still be running.

{code}
    def update_state(self, session=None):
        """
        Determines the overall state of the DagRun based on the state
        of its TaskInstances.
        :returns State:
        """

        dag = self.get_dag()
        tis = self.get_task_instances(session=session)

        [...omitted...]

        # future: remove the check on adhoc tasks (=active_tasks)
        if len(tis) == len(dag.active_tasks):
            # if any roots failed, the run failed
            root_ids = [t.task_id for t in dag.roots]
            roots = [t for t in tis if t.task_id in root_ids]

            if any(r.state in (State.FAILED, State.UPSTREAM_FAILED)
                   for r in roots):
                logging.info('Marking run {} failed'.format(self))
                self.state = State.FAILED

            # if all roots succeeded, the run succeeded
            elif all(r.state in (State.SUCCESS, State.SKIPPED)
                     for r in roots):
                logging.info('Marking run {} successful'.format(self))
                self.state = State.SUCCESS

            # if *all tasks* are deadlocked, the run failed
            elif unfinished_tasks and none_depends_on_past and no_dependencies_met:
                logging.info(
                    'Deadlock; marking run {} failed'.format(self))
                self.state = State.FAILED

            # finally, if the roots aren't done, the dag is still running
            else:
                self.state = State.RUNNING
{code}

https://github.com/apache/incubator-airflow/blob/527e3ecdb8adf9d7a2f40c3ce2c30f2ccaf1924a/airflow/models.py#L3800





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)