You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Casey Ching (JIRA)" <ji...@apache.org> on 2016/10/31 19:40:58 UTC
[jira] [Created] (AIRFLOW-608) DAG.max_active_runs should consider
runs with active/pending tasks as active
Casey Ching created AIRFLOW-608:
-----------------------------------
Summary: DAG.max_active_runs should consider runs with active/pending tasks as active
Key: AIRFLOW-608
URL: https://issues.apache.org/jira/browse/AIRFLOW-608
Project: Apache Airflow
Issue Type: Bug
Components: DagRun, scheduler
Affects Versions: Airflow 1.7.1
Reporter: Casey Ching
Priority: Critical
I expected dag runs that have active or pending tasks to be counted towards DAG.max_active_runs but that doesn't happen if the dag run has a failed task.
The code works as it is described, the issue is that the feature isn't as useful as it could be due to the way it accounts for failed tasks. There should be some way to limit concurrent dag runs, where the definition of running means a dag run has any active or pending tasks.
max_active_runs is described as
{code}
:param max_active_runs: maximum number of active DAG runs, beyond this
number of DAG runs in a running state, the scheduler won't create
new active DAG runs
{code}
https://github.com/apache/incubator-airflow/blob/527e3ecdb8adf9d7a2f40c3ce2c30f2ccaf1924a/airflow/models.py#L2521
and it's usage is
{code}
if dag.schedule_interval:
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
external_trigger=False,
session=session
)
# return if already reached maximum active runs and no timeout setting
if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
return
{code}
https://github.com/apache/incubator-airflow/blob/527e3ecdb8adf9d7a2f40c3ce2c30f2ccaf1924a/airflow/jobs.py#L689
but the state is not considered RUNNING if any of the tasks have failed though some other tasks could still be running.
{code}
def update_state(self, session=None):
"""
Determines the overall state of the DagRun based on the state
of its TaskInstances.
:returns State:
"""
dag = self.get_dag()
tis = self.get_task_instances(session=session)
[...omitted...]
# future: remove the check on adhoc tasks (=active_tasks)
if len(tis) == len(dag.active_tasks):
# if any roots failed, the run failed
root_ids = [t.task_id for t in dag.roots]
roots = [t for t in tis if t.task_id in root_ids]
if any(r.state in (State.FAILED, State.UPSTREAM_FAILED)
for r in roots):
logging.info('Marking run {} failed'.format(self))
self.state = State.FAILED
# if all roots succeeded, the run succeeded
elif all(r.state in (State.SUCCESS, State.SKIPPED)
for r in roots):
logging.info('Marking run {} successful'.format(self))
self.state = State.SUCCESS
# if *all tasks* are deadlocked, the run failed
elif unfinished_tasks and none_depends_on_past and no_dependencies_met:
logging.info(
'Deadlock; marking run {} failed'.format(self))
self.state = State.FAILED
# finally, if the roots aren't done, the dag is still running
else:
self.state = State.RUNNING
{code}
https://github.com/apache/incubator-airflow/blob/527e3ecdb8adf9d7a2f40c3ce2c30f2ccaf1924a/airflow/models.py#L3800
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)