You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Eran (JIRA)" <ji...@apache.org> on 2018/08/22 08:58:00 UTC
[jira] [Commented] (AIRFLOW-74) SubdagOperators can consume all
celeryd worker processes
[ https://issues.apache.org/jira/browse/AIRFLOW-74?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16588585#comment-16588585 ]
Eran commented on AIRFLOW-74:
-----------------------------
# What are the consequences of this when there is a subdag running and the scheduler process is shutting down?
# What if I want to run multiple instances of a subdag at he same time? this default implicitly forces me to run them one by one, while this seems to contradict the param of depends_on_past=False default
Thanks
> SubdagOperators can consume all celeryd worker processes
> --------------------------------------------------------
>
> Key: AIRFLOW-74
> URL: https://issues.apache.org/jira/browse/AIRFLOW-74
> Project: Apache Airflow
> Issue Type: Bug
> Components: celery
> Affects Versions: Airflow 1.7.1, Airflow 1.7.0, Airflow 1.6.2
> Environment: Airflow 1.7.1rc3 with CeleryExecutor
> 1 webserver
> 1 scheduler
> 2 workers
> Reporter: Steven Yvinec-Kruyk
> Assignee: zgl
> Priority: Major
> Fix For: 1.10.0
>
>
> If the amount of concurrent ```SubdagOperator``` running >= the no. of celery worker processes tasks are unable to work. All SDOs come to a complete halt. Futhermore performance of a DAG is drastically reduced even before full saturation of the workers as less workers are gradually available for actual tasks. A workaround for this is to specify ```SequentialExecutor``` be used by the ```SubdagOperator```
> ```
> from datetime import timedelta, datetime
> from airflow.models import DAG, Pool
> from airflow.operators import BashOperator, SubDagOperator, DummyOperator
> from airflow.executors import SequentialExecutor
> import airflow
> # -----------------------------------------------------------------\
> # DEFINE THE POOLS
> # -----------------------------------------------------------------/
> session = airflow.settings.Session()
> for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
> pool = (
> session.query(Pool)
> .filter(Pool.pool == p)
> .first())
> if not pool:
> session.add(Pool(pool=p, slots=8))
> session.commit()
> # -----------------------------------------------------------------\
> # DEFINE THE DAG
> # -----------------------------------------------------------------/
> # Define the Dag Name. This must be unique.
> dag_name = 'hanging_subdags_n16_sqe'
> # Default args are passed to each task
> default_args = {
> 'owner': 'Airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2016, 04, 10),
> 'retries': 0,
> 'retry_interval': timedelta(minutes=5),
> 'email': ['your@email.com'],
> 'email_on_failure': True,
> 'email_on_retry': True,
> 'wait_for_downstream': False,
> }
> # Create the dag object
> dag = DAG(dag_name,
> default_args=default_args,
> schedule_interval='0 0 * * *'
> )
> # -----------------------------------------------------------------\
> # DEFINE THE TASKS
> # -----------------------------------------------------------------/
> def get_subdag(dag, sd_id, pool=None):
> subdag = DAG(
> dag_id='{parent_dag}.{sd_id}'.format(
> parent_dag=dag.dag_id,
> sd_id=sd_id),
> params=dag.params,
> default_args=dag.default_args,
> template_searchpath=dag.template_searchpath,
> user_defined_macros=dag.user_defined_macros,
> )
> t1 = BashOperator(
> task_id='{sd_id}_step_1'.format(
> sd_id=sd_id
> ),
> bash_command='echo "hello" && sleep 60',
> dag=subdag,
> pool=pool,
> executor=SequentialExecutor
> )
> t2 = BashOperator(
> task_id='{sd_id}_step_two'.format(
> sd_id=sd_id
> ),
> bash_command='echo "hello" && sleep 15',
> dag=subdag,
> pool=pool,
> executor=SequentialExecutor
> )
> t2.set_upstream(t1)
> sdo = SubDagOperator(
> task_id=sd_id,
> subdag=subdag,
> retries=0,
> retry_delay=timedelta(seconds=5),
> dag=dag,
> depends_on_past=True,
> )
> return sdo
> start_task = DummyOperator(
> task_id='start',
> dag=dag
> )
> for n in range(1, 17):
> sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
> sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
> sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')
> sd_i.set_upstream(start_task)
> sd_ii.set_upstream(sd_i)
> sd_iii.set_upstream(sd_ii)
> ```
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)