You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Eran (JIRA)" <ji...@apache.org> on 2018/08/22 08:58:00 UTC

[jira] [Commented] (AIRFLOW-74) SubdagOperators can consume all celeryd worker processes

    [ https://issues.apache.org/jira/browse/AIRFLOW-74?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16588585#comment-16588585 ] 

Eran commented on AIRFLOW-74:
-----------------------------

# What are the consequences of this when there is a subdag running and the scheduler process is shutting down?
 # What if I want to run multiple instances of a subdag at he same time? this default implicitly forces me to run them one by one, while this seems to contradict the param of depends_on_past=False default

Thanks

> SubdagOperators can consume all celeryd worker processes
> --------------------------------------------------------
>
>                 Key: AIRFLOW-74
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-74
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery
>    Affects Versions: Airflow 1.7.1, Airflow 1.7.0, Airflow 1.6.2
>         Environment: Airflow 1.7.1rc3 with CeleryExecutor
> 1  webserver
> 1 scheduler
> 2 workers 
>            Reporter: Steven Yvinec-Kruyk
>            Assignee: zgl
>            Priority: Major
>             Fix For: 1.10.0
>
>
> If the amount of concurrent ```SubdagOperator``` running >= the no. of celery worker processes tasks are unable to work. All SDOs come to a complete halt. Futhermore performance of a DAG is drastically reduced even before full saturation of the workers as less workers are gradually available for actual tasks. A workaround for this is to specify ```SequentialExecutor``` be used by the ```SubdagOperator```
> ```
> from datetime import timedelta, datetime
> from airflow.models import DAG, Pool
> from airflow.operators import BashOperator, SubDagOperator, DummyOperator
> from airflow.executors import SequentialExecutor
> import airflow
> # -----------------------------------------------------------------\
> # DEFINE THE POOLS
> # -----------------------------------------------------------------/
> session = airflow.settings.Session()
> for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
>     pool = (
>         session.query(Pool)
>         .filter(Pool.pool == p)
>         .first())
>     if not pool:
>         session.add(Pool(pool=p, slots=8))
>         session.commit()
> # -----------------------------------------------------------------\
> # DEFINE THE DAG
> # -----------------------------------------------------------------/
> # Define the Dag Name. This must be unique.
> dag_name = 'hanging_subdags_n16_sqe'
> # Default args are passed to each task
> default_args = {
>     'owner': 'Airflow',
>     'depends_on_past': False,
>     'start_date': datetime(2016, 04, 10),
>     'retries': 0,
>     'retry_interval': timedelta(minutes=5),
>     'email': ['your@email.com'],
>     'email_on_failure': True,
>     'email_on_retry': True,
>     'wait_for_downstream': False,
> }
> # Create the dag object
> dag = DAG(dag_name,
>           default_args=default_args,
>           schedule_interval='0 0 * * *'
>           )
> # -----------------------------------------------------------------\
> # DEFINE THE TASKS
> # -----------------------------------------------------------------/
> def get_subdag(dag, sd_id, pool=None):
>     subdag = DAG(
>         dag_id='{parent_dag}.{sd_id}'.format(
>             parent_dag=dag.dag_id,
>             sd_id=sd_id),
>         params=dag.params,
>         default_args=dag.default_args,
>         template_searchpath=dag.template_searchpath,
>         user_defined_macros=dag.user_defined_macros,
>     )
>     t1 = BashOperator(
>         task_id='{sd_id}_step_1'.format(
>             sd_id=sd_id
>         ),
>         bash_command='echo "hello" && sleep 60',
>         dag=subdag,
>         pool=pool,
>         executor=SequentialExecutor
>     )
>     t2 = BashOperator(
>         task_id='{sd_id}_step_two'.format(
>             sd_id=sd_id
>         ),
>         bash_command='echo "hello" && sleep 15',
>         dag=subdag,
>         pool=pool,
>         executor=SequentialExecutor
>     )
>     t2.set_upstream(t1)
>     sdo = SubDagOperator(
>         task_id=sd_id,
>         subdag=subdag,
>         retries=0,
>         retry_delay=timedelta(seconds=5),
>         dag=dag,
>         depends_on_past=True,
>     )
>     return sdo
> start_task = DummyOperator(
>     task_id='start',
>     dag=dag
> )
> for n in range(1, 17):
>     sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
>     sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
>     sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')
>     sd_i.set_upstream(start_task)
>     sd_ii.set_upstream(sd_i)
>     sd_iii.set_upstream(sd_ii)
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)