You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/22 18:03:31 UTC

[GitHub] [airflow] chenglongyan opened a new pull request #21688: Simplify check_cycle() implement

chenglongyan opened a new pull request #21688:
URL: https://github.com/apache/airflow/pull/21688


   DAG.topological_sort() seems did the same thing?
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] chenglongyan closed pull request #21688: Simplify check_cycle() implement

Posted by GitBox <gi...@apache.org>.
chenglongyan closed pull request #21688:
URL: https://github.com/apache/airflow/pull/21688


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21688: Simplify check_cycle() implement

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21688:
URL: https://github.com/apache/airflow/pull/21688#discussion_r810688335



##########
File path: airflow/utils/dag_cycle_tester.py
##########
@@ -49,33 +42,4 @@ def check_cycle(dag: 'DAG') -> None:
 
     :raises AirflowDagCycleException: If cycle is found in the DAG.
     """
-    # default of int is 0 which corresponds to CYCLE_NEW
-    visited: Dict[str, int] = defaultdict(int)
-    path_stack: Deque[str] = deque()
-    task_dict = dag.task_dict
-
-    def _check_adjacent_tasks(task_id, current_task):
-        """Returns first untraversed child task, else None if all tasks traversed."""
-        for adjacent_task in current_task.get_direct_relative_ids():
-            if visited[adjacent_task] == CYCLE_IN_PROGRESS:
-                msg = f"Cycle detected in DAG. Faulty task: {task_id}"
-                raise AirflowDagCycleException(msg)
-            elif visited[adjacent_task] == CYCLE_NEW:
-                return adjacent_task
-        return None
-
-    for dag_task_id in dag.task_dict.keys():
-        if visited[dag_task_id] == CYCLE_DONE:
-            continue
-        path_stack.append(dag_task_id)
-        while path_stack:
-            current_task_id = path_stack[-1]
-            if visited[current_task_id] == CYCLE_NEW:
-                visited[current_task_id] = CYCLE_IN_PROGRESS
-            task = task_dict[current_task_id]
-            child_to_check = _check_adjacent_tasks(current_task_id, task)
-            if not child_to_check:
-                visited[current_task_id] = CYCLE_DONE
-                path_stack.pop()
-            else:
-                path_stack.append(child_to_check)
+    dag.topological_sort(include_subdag_tasks=False)

Review comment:
       `topological_sort` calls `topological_sort` on each sub-DAG, but with `include_subdag_tasks` (which excludes sub-DAGs), it is _not_ recursive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21688: Simplify check_cycle() implement

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21688:
URL: https://github.com/apache/airflow/pull/21688#discussion_r810688335



##########
File path: airflow/utils/dag_cycle_tester.py
##########
@@ -49,33 +42,4 @@ def check_cycle(dag: 'DAG') -> None:
 
     :raises AirflowDagCycleException: If cycle is found in the DAG.
     """
-    # default of int is 0 which corresponds to CYCLE_NEW
-    visited: Dict[str, int] = defaultdict(int)
-    path_stack: Deque[str] = deque()
-    task_dict = dag.task_dict
-
-    def _check_adjacent_tasks(task_id, current_task):
-        """Returns first untraversed child task, else None if all tasks traversed."""
-        for adjacent_task in current_task.get_direct_relative_ids():
-            if visited[adjacent_task] == CYCLE_IN_PROGRESS:
-                msg = f"Cycle detected in DAG. Faulty task: {task_id}"
-                raise AirflowDagCycleException(msg)
-            elif visited[adjacent_task] == CYCLE_NEW:
-                return adjacent_task
-        return None
-
-    for dag_task_id in dag.task_dict.keys():
-        if visited[dag_task_id] == CYCLE_DONE:
-            continue
-        path_stack.append(dag_task_id)
-        while path_stack:
-            current_task_id = path_stack[-1]
-            if visited[current_task_id] == CYCLE_NEW:
-                visited[current_task_id] = CYCLE_IN_PROGRESS
-            task = task_dict[current_task_id]
-            child_to_check = _check_adjacent_tasks(current_task_id, task)
-            if not child_to_check:
-                visited[current_task_id] = CYCLE_DONE
-                path_stack.pop()
-            else:
-                path_stack.append(child_to_check)
+    dag.topological_sort(include_subdag_tasks=False)

Review comment:
       `topological_sort` is recursive, it calls `topological_sort` on each sub-DAG.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] chenglongyan commented on a change in pull request #21688: Simplify check_cycle() implement

Posted by GitBox <gi...@apache.org>.
chenglongyan commented on a change in pull request #21688:
URL: https://github.com/apache/airflow/pull/21688#discussion_r811031602



##########
File path: airflow/utils/dag_cycle_tester.py
##########
@@ -49,33 +42,4 @@ def check_cycle(dag: 'DAG') -> None:
 
     :raises AirflowDagCycleException: If cycle is found in the DAG.
     """
-    # default of int is 0 which corresponds to CYCLE_NEW
-    visited: Dict[str, int] = defaultdict(int)
-    path_stack: Deque[str] = deque()
-    task_dict = dag.task_dict
-
-    def _check_adjacent_tasks(task_id, current_task):
-        """Returns first untraversed child task, else None if all tasks traversed."""
-        for adjacent_task in current_task.get_direct_relative_ids():
-            if visited[adjacent_task] == CYCLE_IN_PROGRESS:
-                msg = f"Cycle detected in DAG. Faulty task: {task_id}"
-                raise AirflowDagCycleException(msg)
-            elif visited[adjacent_task] == CYCLE_NEW:
-                return adjacent_task
-        return None
-
-    for dag_task_id in dag.task_dict.keys():
-        if visited[dag_task_id] == CYCLE_DONE:
-            continue
-        path_stack.append(dag_task_id)
-        while path_stack:
-            current_task_id = path_stack[-1]
-            if visited[current_task_id] == CYCLE_NEW:
-                visited[current_task_id] = CYCLE_IN_PROGRESS
-            task = task_dict[current_task_id]
-            child_to_check = _check_adjacent_tasks(current_task_id, task)
-            if not child_to_check:
-                visited[current_task_id] = CYCLE_DONE
-                path_stack.pop()
-            else:
-                path_stack.append(child_to_check)
+    dag.topological_sort(include_subdag_tasks=False)

Review comment:
       yes , if include_subdag_tasks==True and DAG has sub-DAG , it would be recursived.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #21688: Simplify check_cycle() implement

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #21688:
URL: https://github.com/apache/airflow/pull/21688#discussion_r810529468



##########
File path: airflow/utils/dag_cycle_tester.py
##########
@@ -49,33 +42,4 @@ def check_cycle(dag: 'DAG') -> None:
 
     :raises AirflowDagCycleException: If cycle is found in the DAG.
     """
-    # default of int is 0 which corresponds to CYCLE_NEW
-    visited: Dict[str, int] = defaultdict(int)
-    path_stack: Deque[str] = deque()
-    task_dict = dag.task_dict
-
-    def _check_adjacent_tasks(task_id, current_task):
-        """Returns first untraversed child task, else None if all tasks traversed."""
-        for adjacent_task in current_task.get_direct_relative_ids():
-            if visited[adjacent_task] == CYCLE_IN_PROGRESS:
-                msg = f"Cycle detected in DAG. Faulty task: {task_id}"
-                raise AirflowDagCycleException(msg)
-            elif visited[adjacent_task] == CYCLE_NEW:
-                return adjacent_task
-        return None
-
-    for dag_task_id in dag.task_dict.keys():
-        if visited[dag_task_id] == CYCLE_DONE:
-            continue
-        path_stack.append(dag_task_id)
-        while path_stack:
-            current_task_id = path_stack[-1]
-            if visited[current_task_id] == CYCLE_NEW:
-                visited[current_task_id] = CYCLE_IN_PROGRESS
-            task = task_dict[current_task_id]
-            child_to_check = _check_adjacent_tasks(current_task_id, task)
-            if not child_to_check:
-                visited[current_task_id] = CYCLE_DONE
-                path_stack.pop()
-            else:
-                path_stack.append(child_to_check)
+    dag.topological_sort(include_subdag_tasks=False)

Review comment:
       Is the sort implementation recursive or not?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #21688: Simplify check_cycle() implement

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #21688:
URL: https://github.com/apache/airflow/pull/21688#issuecomment-1046326977


   If the two functions are indeed equivalent, perhaps we should deprecate `check_cycle` altogether. Also if we are to replace `check_cycle` with `topological_sort` (whether to deprecate the former or not), the latter needs to be modified to raise `AirflowDagCycleException` instead, for backward compatibility. (Fortunately it currently raise `AirflowException`, which is the superclass of `AirflowDagCycleException`, so changing the exception type would be backward compatible.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21688: Simplify check_cycle() implement

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21688:
URL: https://github.com/apache/airflow/pull/21688#discussion_r810688105



##########
File path: airflow/utils/dag_cycle_tester.py
##########
@@ -49,33 +42,4 @@ def check_cycle(dag: 'DAG') -> None:
 
     :raises AirflowDagCycleException: If cycle is found in the DAG.
     """
-    # default of int is 0 which corresponds to CYCLE_NEW
-    visited: Dict[str, int] = defaultdict(int)
-    path_stack: Deque[str] = deque()
-    task_dict = dag.task_dict
-
-    def _check_adjacent_tasks(task_id, current_task):
-        """Returns first untraversed child task, else None if all tasks traversed."""
-        for adjacent_task in current_task.get_direct_relative_ids():
-            if visited[adjacent_task] == CYCLE_IN_PROGRESS:
-                msg = f"Cycle detected in DAG. Faulty task: {task_id}"
-                raise AirflowDagCycleException(msg)
-            elif visited[adjacent_task] == CYCLE_NEW:
-                return adjacent_task
-        return None
-
-    for dag_task_id in dag.task_dict.keys():
-        if visited[dag_task_id] == CYCLE_DONE:
-            continue
-        path_stack.append(dag_task_id)
-        while path_stack:
-            current_task_id = path_stack[-1]
-            if visited[current_task_id] == CYCLE_NEW:
-                visited[current_task_id] = CYCLE_IN_PROGRESS
-            task = task_dict[current_task_id]
-            child_to_check = _check_adjacent_tasks(current_task_id, task)
-            if not child_to_check:
-                visited[current_task_id] = CYCLE_DONE
-                path_stack.pop()
-            else:
-                path_stack.append(child_to_check)
+    dag.topological_sort(include_subdag_tasks=False)

Review comment:
       Would be nice if we could add a few tests for this to find out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] chenglongyan commented on pull request #21688: Simplify check_cycle() implement

Posted by GitBox <gi...@apache.org>.
chenglongyan commented on pull request #21688:
URL: https://github.com/apache/airflow/pull/21688#issuecomment-1046803010


   I think check_cycle should be kept, because it has more readability.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org