You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Dmytro Kulyk (JIRA)" <ji...@apache.org> on 2017/09/07 01:49:00 UTC
[jira] [Commented] (AIRFLOW-584) Airflow Pool does not limit
running tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16156307#comment-16156307 ]
Dmytro Kulyk commented on AIRFLOW-584:
--------------------------------------
Same situation appearing on 1.8.1 running on LocalExecutor with PostgreSQL backend when run a backfill within a pool
Case #1
Config:
* parallelism = 16
* dag_concurrency = 8
* max_active_runs_per_dag = 16
* max_threads = 2
DAG:
* setup as max_active_runs = 4
Pool:
* set Slots to 4
Result:
* 10 task instances are running other ones are initially queued, but recently become "None" with following message in task log
{code:java}
[2017-09-06 23:46:34,211] {base_task_runner.py:95} INFO - Subtask: [2017-09-06 23:46:34,211] {models.py:1122} INFO - Dependencies not met for <TaskInstance: cube_update.update_bets_cube 2017-06-02 07:15:00 [queued]>, dependency 'Task Instance Slots Available' FAILED: The maximum number of running tasks (cube_update) for this task's DAG '8' has been reached.
{code}
Case # 2
same, but changed dag_concurrency = 16
Result:
* 16 task instances are running, all other are queues and waiting until completed
So pool size of 4 is not respected
> Airflow Pool does not limit running tasks
> -----------------------------------------
>
> Key: AIRFLOW-584
> URL: https://issues.apache.org/jira/browse/AIRFLOW-584
> Project: Apache Airflow
> Issue Type: Bug
> Components: pools
> Affects Versions: Airflow 1.7.1.3
> Environment: Ubuntu 14.04
> Reporter: David
> Attachments: img1.png, img2.png
>
>
> Airflow pools are not limiting the number of running task instances for the following dag in 1.7.1.3
> Steps to recreate:
> Create a pool of size 5 through the UI.
> The following dag has 52 tasks with increasing priority corresponding to the task number. There should only ever be 5 tasks running at a time however I observed 29 'used slots' in a pool with 5 slots
> {code}
> dag_name = 'pools_bug'
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2016, 10, 20),
> 'email_on_failure': False,
> 'retries': 1
> }
> dag = DAG(dag_name, default_args=default_args, schedule_interval="0 8 * * *")
> start = DummyOperator(task_id='start', dag=dag)
> end = DummyOperator(task_id='end', dag=dag)
> for i in range(50):
> sleep_command = 'sleep 10'
> task_name = 'task-{}'.format(i)
> op = BashOperator(
> task_id=task_name,
> bash_command=sleep_command,
> execution_timeout=timedelta(hours=4),
> priority_weight=i,
> pool=dag_name,
> dag=dag)
> start.set_downstream(op)
> end.set_upstream(op)
> {code}
> Relevant configurations from airflow.cfg:
> {code}
> [core]
> # The executor class that airflow should use. Choices include
> # SequentialExecutor, LocalExecutor, CeleryExecutor
> executor = CeleryExecutor
> # The amount of parallelism as a setting to the executor. This defines
> # the max number of task instances that should run simultaneously
> # on this airflow installation
> parallelism = 64
> # The number of task instances allowed to run concurrently by the scheduler
> dag_concurrency = 64
> # The maximum number of active DAG runs per DAG
> max_active_runs_per_dag = 1
> [celery]
> # This section only applies if you are using the CeleryExecutor in
> # [core] section above
> # The app name that will be used by celery
> celery_app_name = airflow.executors.celery_executor
> # The concurrency that will be used when starting workers with the
> # "airflow worker" command. This defines the number of task instances that
> # a worker will take, so size up your workers based on the resources on
> # your worker box and the nature of your tasks
> celeryd_concurrency = 64
> [scheduler]
> # Task instances listen for external kill signal (when you clear tasks
> # from the CLI or the UI), this defines the frequency at which they should
> # listen (in seconds).
> job_heartbeat_sec = 5
> # The scheduler constantly tries to trigger new tasks (look at the
> # scheduler section in the docs for more information). This defines
> # how often the scheduler should run (in seconds).
> scheduler_heartbeat_sec = 5
> {code}
> !img1.png!
> !img2.png!
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)