You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/14 23:11:25 UTC

[GitHub] [airflow] ctimbreza commented on issue #15374: Clearing a subdag task leaves parent dag in the failed state

ctimbreza commented on issue #15374:
URL: https://github.com/apache/airflow/issues/15374#issuecomment-819907856


   Here is the dag that I used to reproduce the issue.  Subdag_1 task_2 is configured to randomly fail to help setup the scenario for this issue.
   
   ```
   from airflow.models.dag import DAG
   from airflow.operators.bash import BashOperator
   from airflow.operators.dummy import DummyOperator
   from airflow.operators.python_operator import PythonOperator
   from airflow.operators.subdag_operator import SubDagOperator
   from airflow.utils.dates import days_ago
   import random
   
   access_control = {
       "dst-view": ["can_read"],
       "dst-operator": ["can_read", "can_edit"],
       "dst-admin": ["can_read", "can_edit"]
   }
   
   def random_fail(**context):
       if random.random() > 0.5:
           raise Exception("Test failure")
       print("Success")
   
   def __build_subdag1(parent_dag, child_dag_name):
       with DAG(dag_id='%s.%s' % (parent_dag.dag_id, child_dag_name),
                default_args=parent_dag.default_args,
                schedule_interval=parent_dag.schedule_interval,
                start_date=parent_dag.start_date,
                params=parent_dag.params,
                access_control=parent_dag.access_control,
                is_paused_upon_creation=False) as subdag:
           task_1 = DummyOperator(task_id="task_1")
           task_2 = PythonOperator(task_id="task_2", python_callable=random_fail, provide_context=True)
           task_1 >> task_2
       return subdag
   
   def __build_subdag2(parent_dag, child_dag_name):
       with DAG(dag_id='%s.%s' % (parent_dag.dag_id, child_dag_name),
                default_args=parent_dag.default_args,
                schedule_interval=parent_dag.schedule_interval,
                start_date=parent_dag.start_date,
                params=parent_dag.params,
                access_control=parent_dag.access_control,
                is_paused_upon_creation=False) as subdag:
           task_1 = DummyOperator(task_id="task_1")
           task_2 = BashOperator(task_id="task_2", bash_command='sleep 15')
           task_1 >> task_2
       return subdag
   
   
   with DAG(dag_id="chris-test", start_date=days_ago(2), tags=["dev"], access_control=access_control) as dag:
   
       start = DummyOperator(task_id="start")
   
       subdag_1 = SubDagOperator(
           subdag=__build_subdag1(parent_dag=dag,
                                 child_dag_name="subdag_1"
                                 ),
           task_id="subdag_1",
           dag=dag
       )
   
       subdag_2 = SubDagOperator(
           subdag=__build_subdag2(parent_dag=dag,
                                 child_dag_name="subdag_2"
                                 ),
           task_id="subdag_2",
           dag=dag
       )
   
       end = DummyOperator(task_id='end')
   
       start >> subdag_1 >> subdag_2 >> end
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org