You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Artem Kirillov (JIRA)" <ji...@apache.org> on 2017/01/17 16:19:26 UTC

[jira] [Comment Edited] (AIRFLOW-74) SubdagOperators can consume all celeryd worker processes

    [ https://issues.apache.org/jira/browse/AIRFLOW-74?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826323#comment-15826323 ] 

Artem Kirillov edited comment on AIRFLOW-74 at 1/17/17 4:18 PM:
----------------------------------------------------------------

Guys, this bug breaks the whole conception of concurrency limitation support in Airflow. DAGs are deadlocked all the time. Any chances to fix it?

It looks like the issue is caused by race condition in DAG.concurrency_reached() method.


was (Author: akirillov):
Guys, this bug breaks the whole conception of concurrency limitation support in Airflow. DAGs are deadlocked all the time. Any chances to fix it?

> SubdagOperators can consume all celeryd worker processes
> --------------------------------------------------------
>
>                 Key: AIRFLOW-74
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-74
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery
>    Affects Versions: Airflow 1.7.1, Airflow 1.7.0, Airflow 1.6.2
>         Environment: Airflow 1.7.1rc3 with CeleryExecutor
> 1  webserver
> 1 scheduler
> 2 workers 
>            Reporter: Steven Yvinec-Kruyk
>
> If the amount of concurrent ```SubdagOperator``` running >= the no. of celery worker processes tasks are unable to work. All SDOs come to a complete halt. Futhermore performance of a DAG is drastically reduced even before full saturation of the workers as less workers are gradually available for actual tasks. A workaround for this is to specify ```SequentialExecutor``` be used by the ```SubdagOperator```
> ```
> from datetime import timedelta, datetime
> from airflow.models import DAG, Pool
> from airflow.operators import BashOperator, SubDagOperator, DummyOperator
> from airflow.executors import SequentialExecutor
> import airflow
> # -----------------------------------------------------------------\
> # DEFINE THE POOLS
> # -----------------------------------------------------------------/
> session = airflow.settings.Session()
> for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
>     pool = (
>         session.query(Pool)
>         .filter(Pool.pool == p)
>         .first())
>     if not pool:
>         session.add(Pool(pool=p, slots=8))
>         session.commit()
> # -----------------------------------------------------------------\
> # DEFINE THE DAG
> # -----------------------------------------------------------------/
> # Define the Dag Name. This must be unique.
> dag_name = 'hanging_subdags_n16_sqe'
> # Default args are passed to each task
> default_args = {
>     'owner': 'Airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2016, 04, 10),
>     'retries': 0,
>     'retry_interval': timedelta(minutes=5),
>     'email': ['your@email.com'],
>     'email_on_failure': True,
>     'email_on_retry': True,
>     'wait_for_downstream': False,
> }
> # Create the dag object
> dag = DAG(dag_name,
>           default_args=default_args,
>           schedule_interval='0 0 * * *'
>           )
> # -----------------------------------------------------------------\
> # DEFINE THE TASKS
> # -----------------------------------------------------------------/
> def get_subdag(dag, sd_id, pool=None):
>     subdag = DAG(
>         dag_id='{parent_dag}.{sd_id}'.format(
>             parent_dag=dag.dag_id,
>             sd_id=sd_id),
>         params=dag.params,
>         default_args=dag.default_args,
>         template_searchpath=dag.template_searchpath,
>         user_defined_macros=dag.user_defined_macros,
>     )
>     t1 = BashOperator(
>         task_id='{sd_id}_step_1'.format(
>             sd_id=sd_id
>         ),
>         bash_command='echo "hello" && sleep 60',
>         dag=subdag,
>         pool=pool,
>         executor=SequentialExecutor
>     )
>     t2 = BashOperator(
>         task_id='{sd_id}_step_two'.format(
>             sd_id=sd_id
>         ),
>         bash_command='echo "hello" && sleep 15',
>         dag=subdag,
>         pool=pool,
>         executor=SequentialExecutor
>     )
>     t2.set_upstream(t1)
>     sdo = SubDagOperator(
>         task_id=sd_id,
>         subdag=subdag,
>         retries=0,
>         retry_delay=timedelta(seconds=5),
>         dag=dag,
>         depends_on_past=True,
>     )
>     return sdo
> start_task = DummyOperator(
>     task_id='start',
>     dag=dag
> )
> for n in range(1, 17):
>     sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
>     sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
>     sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')
>     sd_i.set_upstream(start_task)
>     sd_ii.set_upstream(sd_i)
>     sd_iii.set_upstream(sd_ii)
> ```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)