You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Steven Yvinec-Kruyk (JIRA)" <ji...@apache.org> on 2016/05/08 15:25:13 UTC

[jira] [Created] (AIRFLOW-74) SubdagOperators can consume all celeryd worker processes

Steven Yvinec-Kruyk created AIRFLOW-74:
------------------------------------------

             Summary: SubdagOperators can consume all celeryd worker processes
                 Key: AIRFLOW-74
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-74
             Project: Apache Airflow
          Issue Type: Bug
          Components: celery
    Affects Versions: Airflow 1.7.1, Airflow 1.7.0, Airflow 1.6.2
         Environment: Airflow 1.7.1rc3 with CeleryExecutor
1  webserver
1 scheduler
2 workers 
            Reporter: Steven Yvinec-Kruyk


If the amount of concurrent ```SubdagOperator``` running >= the no. of celery worker processes tasks are unable to work. All SDOs come to a complete halt. Futhermore performance of a DAG is drastically reduced even before full saturation of the workers as less workers are gradually available for actual tasks. A workaround for this is to specify ```SequentialExecutor``` be used by the ```SubdagOperator```

```
from datetime import timedelta, datetime
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator, DummyOperator
from airflow.executors import SequentialExecutor
import airflow


# -----------------------------------------------------------------\
# DEFINE THE POOLS
# -----------------------------------------------------------------/
session = airflow.settings.Session()
for p in ['test_pool_1', 'test_pool_2', 'test_pool_3']:
    pool = (
        session.query(Pool)
        .filter(Pool.pool == p)
        .first())
    if not pool:
        session.add(Pool(pool=p, slots=8))
        session.commit()


# -----------------------------------------------------------------\
# DEFINE THE DAG
# -----------------------------------------------------------------/

# Define the Dag Name. This must be unique.
dag_name = 'hanging_subdags_n16_sqe'

# Default args are passed to each task
default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 04, 10),
    'retries': 0,
    'retry_interval': timedelta(minutes=5),
    'email': ['your@email.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 0 * * *'
          )

# -----------------------------------------------------------------\
# DEFINE THE TASKS
# -----------------------------------------------------------------/


def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='{sd_id}_step_1'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 60',
        dag=subdag,
        pool=pool,
        executor=SequentialExecutor
    )

    t2 = BashOperator(
        task_id='{sd_id}_step_two'.format(
            sd_id=sd_id
        ),
        bash_command='echo "hello" && sleep 15',
        dag=subdag,
        pool=pool,
        executor=SequentialExecutor
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=0,
        retry_delay=timedelta(seconds=5),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

start_task = DummyOperator(
    task_id='start',
    dag=dag
)

for n in range(1, 17):
    sd_i = get_subdag(dag=dag, sd_id='level_1_{n}'.format(n=n), pool='test_pool_1')
    sd_ii = get_subdag(dag=dag, sd_id='level_2_{n}'.format(n=n), pool='test_pool_2')
    sd_iii = get_subdag(dag=dag, sd_id='level_3_{n}'.format(n=n), pool='test_pool_3')

    sd_i.set_upstream(start_task)
    sd_ii.set_upstream(sd_i)
    sd_iii.set_upstream(sd_ii)
```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)