You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/27 16:28:06 UTC
[GitHub] [airflow] josh-fell opened a new issue, #23285: Cycle incorrectly detected in DAGs when using Labels within Task Groups
josh-fell opened a new issue, #23285:
URL: https://github.com/apache/airflow/issues/23285
### Apache Airflow version
2.3.0b1 (pre-release)
### What happened
When attempting to create a DAG containing Task Groups and in those Task Groups there are Labels between nodes, the DAG fails to import due to cycle detection.
Consider this DAG:
```python
from pendulum import datetime
from airflow.decorators import dag, task, task_group
from airflow.utils.edgemodifier import Label
@task
def begin():
...
@task
def end():
...
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def task_groups_with_edge_labels():
@task_group
def group():
begin() >> Label("label") >> end()
group()
_ = task_groups_with_edge_labels()
```
When attempting to import the DAG, this error message is displayed:
<img width="1395" alt="image" src="https://user-images.githubusercontent.com/48934154/165566299-3dd65cff-5e36-47d3-a243-7bc33d4344d6.png">
This also occurs on the `main` branch as well.
### What you think should happen instead
Users should be able to specify Labels between tasks within a Task Group.
### How to reproduce
- Use the DAG mentioned above and try to import into an Airflow environment
- Or, create a simple unit test of the following and execute said test.
```python
def test_cycle_task_group_with_edge_labels(self):
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
from airflow.utils.edgemodifier import Label
dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
with dag:
with TaskGroup(group_id="task_group") as task_group:
op1 = EmptyOperator(task_id='A')
op2 = EmptyOperator(task_id='B')
op1 >> Label("label") >> op2
assert not check_cycle(dag)
```
A `AirflowDagCycleException` should be thrown:
```
tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels FAILED [100%]
=============================================================================================== FAILURES ===============================================================================================
________________________________________________________________________ TestCycleTester.test_cycle_task_group_with_edge_labels ________________________________________________________________________
self = <tests.utils.test_dag_cycle.TestCycleTester testMethod=test_cycle_task_group_with_edge_labels>
def test_cycle_task_group_with_edge_labels(self):
from airflow.models.baseoperator import chain
from airflow.utils.task_group import TaskGroup
from airflow.utils.edgemodifier import Label
dag = DAG('dag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
with dag:
with TaskGroup(group_id="task_group") as task_group:
op1 = EmptyOperator(task_id='A')
op2 = EmptyOperator(task_id='B')
op1 >> Label("label") >> op2
> assert not check_cycle(dag)
tests/utils/test_dag_cycle.py:168:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
airflow/utils/dag_cycle_tester.py:76: in check_cycle
child_to_check = _check_adjacent_tasks(current_task_id, task)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
task_id = 'task_group.B', current_task = <Task(EmptyOperator): task_group.B>
def _check_adjacent_tasks(task_id, current_task):
"""Returns first untraversed child task, else None if all tasks traversed."""
for adjacent_task in current_task.get_direct_relative_ids():
if visited[adjacent_task] == CYCLE_IN_PROGRESS:
msg = f"Cycle detected in DAG. Faulty task: {task_id}"
> raise AirflowDagCycleException(msg)
E airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG. Faulty task: task_group.B
airflow/utils/dag_cycle_tester.py:62: AirflowDagCycleException
---------------------------------------------------------------------------------------- Captured stdout setup -----------------------------------------------------------------------------------------
========================= AIRFLOW ==========================
Home of the user: /root
Airflow home /root/airflow
Skipping initializing of the DB as it was initialized already.
You can re-initialize the database by adding --with-db-init flag when running tests.
======================================================================================= short test summary info ========================================================================================
FAILED tests/utils/test_dag_cycle.py::TestCycleTester::test_cycle_task_group_with_edge_labels - airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG. Faulty task: task_group.B
==================================================================================== 1 failed, 2 warnings in 1.08s =====================================================================================
```
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
N/A
### Deployment
Astronomer
### Deployment details
This issue also occurs on the `main` branch using Breeze.
### Anything else
Possibly related to #21404
When the Label is removed, no cycle is detected.
```python
from pendulum import datetime
from airflow.decorators import dag, task, task_group
from airflow.utils.edgemodifier import Label
@task
def begin():
...
@task
def end():
...
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def task_groups_with_edge_labels():
@task_group
def group():
begin() >> end()
group()
_ = task_groups_with_edge_labels()
```
<img width="1437" alt="image" src="https://user-images.githubusercontent.com/48934154/165566908-a521d685-a032-482e-9e6b-ef85f0743e64.png">
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #23285: Cycle incorrectly detected in DAGs when using Labels within Task Groups
Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #23285:
URL: https://github.com/apache/airflow/issues/23285#issuecomment-1111214610
Can you try this with 2.3.0rc1 too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on issue #23285: Cycle incorrectly detected in DAGs when using Labels within Task Groups
Posted by GitBox <gi...@apache.org>.
ashb commented on issue #23285:
URL: https://github.com/apache/airflow/issues/23285#issuecomment-1111264110
Curious.
```
(Pdb++) p current_task.get_direct_relative_ids()
{'group.end'}
(Pdb++) p current_task
<Task(_PythonDecoratedOperator): group.end>
```
group.end thinks it's downstream task is .... itself!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb closed issue #23285: Cycle incorrectly detected in DAGs when using Labels within Task Groups
Posted by GitBox <gi...@apache.org>.
ashb closed issue #23285: Cycle incorrectly detected in DAGs when using Labels within Task Groups
URL: https://github.com/apache/airflow/issues/23285
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org