You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Steven Yvinec-Kruyk (JIRA)" <ji...@apache.org> on 2016/05/08 15:25:13 UTC
[jira] [Created] (AIRFLOW-74) SubdagOperators can consume all
celeryd worker processes
Steven Yvinec-Kruyk created AIRFLOW-74:
------------------------------------------
Summary: SubdagOperators can consume all celeryd worker processes
Key: AIRFLOW-74
URL: https://issues.apache.org/jira/browse/AIRFLOW-74
Project: Apache Airflow
Issue Type: Bug
Components: celery
Affects Versions: Airflow 1.7.1, Airflow 1.7.0, Airflow 1.6.2
Environment: Airflow 1.7.1rc3 with CeleryExecutor
1 webserver
1 scheduler
2 workers
Reporter: Steven Yvinec-Kruyk
If the amount of concurrent ```SubdagOperator``` running >= the no. of celery worker processes tasks are unable to work. All SDOs come to a complete halt. Futhermore performance of a DAG is drastically reduced even before full saturation of the workers as less workers are gradually available for actual tasks. A workaround for this is to specify ```SequentialExecutor``` be used by the ```SubdagOperator```
```
from datetime import timedelta, datetime
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator, DummyOperator
from airflow.executors import SequentialExecutor
import airflow
# -----------------------------------------------------------------\
# DEFINE THE POOLS
# -----------------------------------------------------------------/
session = airflow.settings.Session()
for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
pool = (
session.query(Pool)
.filter(Pool.pool == p)
.first())
if not pool:
session.add(Pool(pool=p, slots=8))
session.commit()
# -----------------------------------------------------------------\
# DEFINE THE DAG
# -----------------------------------------------------------------/
# Define the Dag Name. This must be unique.
dag_name = 'hanging_subdags_n16_sqe'
# Default args are passed to each task
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2016, 04, 10),
'retries': 0,
'retry_interval': timedelta(minutes=5),
'email': ['your@email.com'],
'email_on_failure': True,
'email_on_retry': True,
'wait_for_downstream': False,
}
# Create the dag object
dag = DAG(dag_name,
default_args=default_args,
schedule_interval='0 0 * * *'
)
# -----------------------------------------------------------------\
# DEFINE THE TASKS
# -----------------------------------------------------------------/
def get_subdag(dag, sd_id, pool=None):
subdag = DAG(
dag_id='{parent_dag}.{sd_id}'.format(
parent_dag=dag.dag_id,
sd_id=sd_id),
params=dag.params,
default_args=dag.default_args,
template_searchpath=dag.template_searchpath,
user_defined_macros=dag.user_defined_macros,
)
t1 = BashOperator(
task_id='{sd_id}_step_1'.format(
sd_id=sd_id
),
bash_command='echo "hello" && sleep 60',
dag=subdag,
pool=pool,
executor=SequentialExecutor
)
t2 = BashOperator(
task_id='{sd_id}_step_two'.format(
sd_id=sd_id
),
bash_command='echo "hello" && sleep 15',
dag=subdag,
pool=pool,
executor=SequentialExecutor
)
t2.set_upstream(t1)
sdo = SubDagOperator(
task_id=sd_id,
subdag=subdag,
retries=0,
retry_delay=timedelta(seconds=5),
dag=dag,
depends_on_past=True,
)
return sdo
start_task = DummyOperator(
task_id='start',
dag=dag
)
for n in range(1, 17):
sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')
sd_i.set_upstream(start_task)
sd_ii.set_upstream(sd_i)
sd_iii.set_upstream(sd_ii)
```
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)