You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/27 16:28:06 UTC

[GitHub] [airflow] josh-fell opened a new issue, #23285: Cycle incorrectly detected in DAGs when using Labels within Task Groups

josh-fell opened a new issue, #23285:
URL: https://github.com/apache/airflow/issues/23285

   ### Apache Airflow version
   
   2.3.0b1 (pre-release)
   
   ### What happened
   
   When attempting to create a DAG containing Task Groups and in those Task Groups there are Labels between nodes, the DAG fails to import due to cycle detection. 
   
   Consider this DAG:
   
   ```python
   from pendulum import datetime
   
   from airflow.decorators import dag, task, task_group
   from airflow.utils.edgemodifier import Label
   
   
   @task
   def begin():
       ...
   
   
   @task
   def end():
       ...
   
   
   @dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
   def task_groups_with_edge_labels():
       @task_group
       def group():
           begin() >> Label("label") >> end()
   
       group()
   
   
   _ = task_groups_with_edge_labels()
   
   ```
   
   When attempting to import the DAG, this error message is displayed:
   <img width="1395" alt="image" src="https://user-images.githubusercontent.com/48934154/165566299-3dd65cff-5e36-47d3-a243-7bc33d4344d6.png">
   
   
   This also occurs on the `main` branch as well.
   
   ### What you think should happen instead
   
   Users should be able to specify Labels between tasks within a Task Group. 
   
   ### How to reproduce
   
   - Use the DAG mentioned above and try to import into an Airflow environment
   
   - Or, create a simple unit test of the following and execute said test.
   ```python
       def test_cycle_task_group_with_edge_labels(self):
           from airflow.models.baseoperator import chain
           from airflow.utils.task_group import TaskGroup
           from airflow.utils.edgemodifier import Label
   
           dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
   
           with dag:
               with TaskGroup(group_id="task_group") as task_group:
                   op1 = EmptyOperator(task_id='A')
                   op2 = EmptyOperator(task_id='B')
   
                   op1 >> Label("label") >> op2
   
           assert not check_cycle(dag)
   ```
   
   A `AirflowDagCycleException` should be thrown:
   ```
   tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels FAILED                                                                                                    [100%]
   
   =============================================================================================== FAILURES ===============================================================================================
   ________________________________________________________________________ TestCycleTester.test_cycle_task_group_with_edge_labels ________________________________________________________________________
   
   self = <tests.utils.test_dag_cycle.TestCycleTester testMethod=test_cycle_task_group_with_edge_labels>
   
       def test_cycle_task_group_with_edge_labels(self):
           from airflow.models.baseoperator import chain
           from airflow.utils.task_group import TaskGroup
           from airflow.utils.edgemodifier import Label
   
           dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
   
           with dag:
               with TaskGroup(group_id="task_group") as task_group:
                   op1 = EmptyOperator(task_id='A')
                   op2 = EmptyOperator(task_id='B')
   
                   op1 >> Label("label") >> op2
   
   >       assert not check_cycle(dag)
   
   tests/utils/test_dag_cycle.py:168:
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   airflow/utils/dag_cycle_tester.py:76: in check_cycle
       child_to_check = _check_adjacent_tasks(current_task_id, task)
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   
   task_id = 'task_group.B', current_task = <Task(EmptyOperator): task_group.B>
   
       def _check_adjacent_tasks(task_id, current_task):
           """Returns first untraversed child task, else None if all tasks traversed."""
           for adjacent_task in current_task.get_direct_relative_ids():
               if visited[adjacent_task] == CYCLE_IN_PROGRESS:
                   msg = f"Cycle detected in DAG. Faulty task: {task_id}"
   >               raise AirflowDagCycleException(msg)
   E               airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG. Faulty task: task_group.B
   
   airflow/utils/dag_cycle_tester.py:62: AirflowDagCycleException
   ---------------------------------------------------------------------------------------- Captured stdout setup -----------------------------------------------------------------------------------------
   ========================= AIRFLOW ==========================
   Home of the user: /root
   Airflow home /root/airflow
   Skipping initializing of the DB as it was initialized already.
   You can re-initialize the database by adding --with-db-init flag when running tests.
   ======================================================================================= short test summary info ========================================================================================
   FAILED tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels - airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG. Faulty task: task_group.B
   ==================================================================================== 1 failed, 2 warnings in 1.08s =====================================================================================
   ```
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   This issue also occurs on the `main` branch using Breeze.
   
   ### Anything else
   
   Possibly related to #21404
   
   When the Label is removed, no cycle is detected.
   ```python
   from pendulum import datetime
   
   from airflow.decorators import dag, task, task_group
   from airflow.utils.edgemodifier import Label
   
   
   @task
   def begin():
       ...
   
   
   @task
   def end():
       ...
   
   
   @dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
   def task_groups_with_edge_labels():
       @task_group
       def group():
           begin() >> end()
   
       group()
   
   
   _ = task_groups_with_edge_labels()
   ```
   <img width="1437" alt="image" src="https://user-images.githubusercontent.com/48934154/165566908-a521d685-a032-482e-9e6b-ef85f0743e64.png">
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on issue #23285: Cycle incorrectly detected in DAGs when using Labels within Task Groups

Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #23285:
URL: https://github.com/apache/airflow/issues/23285#issuecomment-1111214610

   Can you try this with 2.3.0rc1 too? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on issue #23285: Cycle incorrectly detected in DAGs when using Labels within Task Groups

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #23285:
URL: https://github.com/apache/airflow/issues/23285#issuecomment-1111264110

   Curious.
   
   ```
   (Pdb++) p current_task.get_direct_relative_ids()
   {'group.end'}
   (Pdb++) p current_task
   <Task(_PythonDecoratedOperator): group.end>
   ```
   
   group.end thinks it's downstream task is .... itself!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb closed issue #23285: Cycle incorrectly detected in DAGs when using Labels within Task Groups

Posted by GitBox <gi...@apache.org>.
ashb closed issue #23285: Cycle incorrectly detected in DAGs when using Labels within Task Groups
URL: https://github.com/apache/airflow/issues/23285


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org