You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Chao-Han Tsai (JIRA)" <ji...@apache.org> on 2019/05/06 04:57:00 UTC

[jira] [Assigned] (AIRFLOW-2372) SubDAGs should share dag_concurrency of parent DAG

     [ https://issues.apache.org/jira/browse/AIRFLOW-2372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chao-Han Tsai reassigned AIRFLOW-2372:
--------------------------------------

    Assignee: Chao-Han Tsai

> SubDAGs should share dag_concurrency of parent DAG
> --------------------------------------------------
>
>                 Key: AIRFLOW-2372
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2372
>             Project: Apache Airflow
>          Issue Type: Wish
>          Components: subdag
>    Affects Versions: 1.9.0
>         Environment: 1.9.0
> a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = 16
>            Reporter: Xiao Zhu
>            Assignee: Chao-Han Tsai
>            Priority: Major
>
> It seems like right now subDAGs are scheduled just like normal DAGs, so if a DAG has a lot of (parallel) subDAGs with each having a lot of operators, triggering that DAG means those subDAGs will get triggered as normal DAGs, and they can easily take all the resources (limited by parallelism) of the scheduler, and other DAGs have to wait for those subDAGs.
> For example, if I have this DAG, with a local scheduler and LocalExecutor, and parallelism = 32, dag_concurrency = 16
> {code:python}
> from airflow.operators.dummy_operator import DummyOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> NUM_SUBDAGS = 20
> NUM_OPS_PER_SUBDAG = 10
> def logging_func(id):
>   log.info("Now running id: {}".format(id))
> def build_dag(dag_id, num_ops):
>   dag = DAG(dag_id)
>   start_op = DummyOperator(task_id='start', dag=dag)
>   for i in range(num_ops):
>     op = PythonOperator(
>       task_id=str(i),
>       python_callable=logging_func,
>       op_args=[i],
>       dag=dag
>     )
>     start_op >> op
>   return dag
> parent_id = 'consistent_failure'
> with DAG(
>   parent_id
> ) as dag:
>   start_op = DummyOperator(task_id='start')
>   for i in range(NUM_SUBDAGS):
>     task_id = "subdag_{}".format(i)
>     op = SubDagOperator(
>       task_id=task_id,
>       subdag=build_dag("{}.{}".format(parent_id, task_id), NUM_OPS_PER_SUBDAG)
>     )
>     start_op >> op
> {code}
> When I trigger this DAG, Airflow tries to run a lot of the subDAGs at the same time, and since they don't share the dag_concurrency with their parent DAG, each of them tries to run all their operators in parallel at the same time too, which results in 500+ python processes created by Airflow.
> Ideally those subDAGs should share dag_concurrency with their parent DAG (and thus with each other too), so when I trigger this DAG, at any time only up to 16 operators, including the ones in the subDAGs, are running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)