You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Gerard Toonstra (JIRA)" <ji...@apache.org> on 2016/11/06 14:36:58 UTC

[jira] [Comment Edited] (AIRFLOW-137) Airflow does not respect 'max_active_runs' when task from multiple dag runs cleared

    [ https://issues.apache.org/jira/browse/AIRFLOW-137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15641891#comment-15641891 ] 

Gerard Toonstra edited comment on AIRFLOW-137 at 11/6/16 2:36 PM:
------------------------------------------------------------------

When dagruns already exist and dags are cleared, dagruns are updated into the "RUNNING" state by "clear_task_instances". Because the scheduler does not look at the "max_active_runs" when it's scheduling in tasks, it will violate the parameter setting.

--- in depth analysis ---

SchedulerJob.create_dag_run will create a dag run for a dag as long as it does not exceed the max_active_runs parameter. It does this by querying for dagruns in the "RUNNING" state.

This is called by the FileProcessor, which runs every n seconds and calls "process_dags" for every dag it found and processed.

These functions are the actual scheduler to pick up new work to perform.

"process_dags" calls "process_task_instances", which only looks at active dagruns to schedule or queue new task instances to execute. It no longer looks at the max_active_runs
parameter, because the way it's used, it is only looking at that parameter when creating new dagruns, not when they are executed or have executed.

When tasks are cleared in the UI, this is done through the "views.py" file and the "clear" subroutine. This in turn calls "dag.clear", which by default has the parameter set:  reset_dag_runs=True" .  The clear subroutine does not override that setting through the UI.

"dag.clear" gets a list of task instances that should be cleared. Then it calls "clear_task_instances" in models.py, which removes those task instance records and also gets a list of all execution_dates for which execution dates were removed. It uses that list of execution dates to reset the dagruns to RUNNING.

The "reset_dag_runs=True" parameter eventually causes a call to "set_dag_runs_state" (default RUNNING), which deals with DagStat records on the basis of dirty flags in a dag.

When the next cycle for the scheduler comes in, those dag runs of which tasks were cleared are found in the RUNNING state and has missing entries for task instances in that dagrun. The result is that all task instances for all execution dates are scheduled in.


was (Author: g.toonstra):
When dagruns already exist and dags are cleared, dagruns are updated into the "RUNNING" state by "clear_task_instances". Because the scheduler does not look at the "max_active_runs" when it's scheduling in tasks, it will violate the parameter setting.

--- in depth analysis ---

SchedulerJob.create_dag_run will create a dag run for a dag as long as it does not exceed the max_active_runs parameter. It does this by querying for dagruns in the "RUNNING" state.

This is called by the FileProcessor, which runs every n seconds and calls "process_dags" for every dag it found and processed.

These functions are the actual scheduler to pick up new work to perform.

"process_dags" calls "process_task_instances", which only looks at active dagruns to schedule or queue new task instances to execute. It no longer looks at the max_active_runs
parameter, because the way it's used, it is only looking at that parameter when creating new dagruns, not when they are executed or have executed.

When tasks are cleared in the UI, this is done through the "views.py" file and the "clear" subroutine. This in turn calls "dag.clear", which by default has the parameter set:  reset_dag_runs=True" .  The clear subroutine does not override that setting through the UI.

"dag.clear" gets a list of task instances that should be cleared. Then it calls "clear_task_instances" in models.py, which removes those task instance records and also gets a list of all execution_dates for which execution dates were removed. It uses that list of execution dates to reset the dagruns to RUNNING.

The "reset_dag_runs=True" parameter eventually causes a call to "set_dag_runs_state" (default RUNNING), which deals with DagStat records on the basis of dirty flags in a dag.


> Airflow does not respect 'max_active_runs' when task from multiple dag runs cleared
> -----------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-137
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-137
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Tomasz Bartczak
>            Priority: Minor
>
> Also requested at https://github.com/apache/incubator-airflow/issues/1442
> Dear Airflow Maintainers,
> Environment
> Before I tell you about my issue, let me describe my Airflow environment:
> Please fill out any appropriate fields:
>     Airflow version: 1.7.0
>     Airflow components: webserver, mysql, scheduler with celery executor
>     Python Version: 2.7.6
>     Operating System: Linux Ubuntu 3.19.0-26-generic Scheduler runs with --num-runs and get restarted around every minute or so
> Description of Issue
> Now that you know a little about me, let me tell you about the issue I am having:
>     What did you expect to happen?
>     After running 'airflow clear -t spark_final_observations2csv -s 2016-04-07T01:00:00 -e 2016-04-11T01:00:00 MODELLING_V6' I expected that this task gets executed in all dag-runs in specified by given time-range - respecting 'max_active_runs'
>     Dag configuration:
>     concurrency= 3,
>     max_active_runs = 2,
>     What happened instead?
>     Airflow at first started executing 3 of those tasks, which already violates 'max_active_runs', but it looks like 'concurrency' was the applied limit here.
>     3_running_2_pending
> After first task was done - airflow scheduled all other tasks, making it 5 running dags at the same time that violates all specified limit.
> In the GUI we saw red warning (5/2 Dags running ;-) )
> Reproducing the Issue
> max_active_runs is respected in a day-to-day basis - when of the tasks was stuck - airflow didn't start more than 2 dags concurrently.
> [screenshots in the original issue: https://github.com/apache/incubator-airflow/issues/1442]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)