You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/07 01:17:31 UTC

[GitHub] [airflow] jedcunningham opened a new pull request, #26188: Fix edge detection iteration

jedcunningham opened a new pull request, #26188:
URL: https://github.com/apache/airflow/pull/26188

   In #26175, edge detection was changed to use iteration instead of recursion, however it wasn't keeping track of the parent tasks. This changes the set we iterate over to be `(task, {children})` pairs instead of just adding all the children tasks to a set.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #26188:
URL: https://github.com/apache/airflow/pull/26188#discussion_r964530820


##########
airflow/models/taskmixin.py:
##########
@@ -236,14 +237,14 @@ def set_upstream(
         self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
 
     @property
-    def downstream_list(self) -> Iterable["DAGNode"]:
+    def downstream_list(self) -> Iterable["Operator"]:

Review Comment:
   @uranusjr Does this need changing or is it fine as it is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26188:
URL: https://github.com/apache/airflow/pull/26188#discussion_r964369811


##########
airflow/www/views.py:
##########
@@ -597,15 +597,16 @@ def collect_edges(task_group):
     edges = set()
 
     def get_downstream(task):
-        tasks_to_trace = task.downstream_list
+        tasks_to_trace = {(task, frozenset(task.downstream_list))}
         while tasks_to_trace:
-            tasks_to_trace_next: Set[str] = set()
-            for child in tasks_to_trace:
-                edge = (task.task_id, child.task_id)
-                if edge in edges:
-                    continue
-                tasks_to_trace_next.update(child.downstream_list)
-                edges.add(edge)
+            tasks_to_trace_next: Set[tuple] = set()
+            for task, children in tasks_to_trace:

Review Comment:
   Does `children` change when we are iterating through? If not, we can simply do this
   
   ```python
   tasks_to_trace = [task]
   while tasks_to_trace:
       for task in tasks_to_trace:
           for child in task.downstream_list:
               edge = (task.task_id, child.task_id)
               if edge in edges:
                   continue
               edges.add(edge)
               tasks_to_trace_next.append(child)
       tasks_to_trace = tasks_to_trace_next
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26188:
URL: https://github.com/apache/airflow/pull/26188#discussion_r964439179


##########
airflow/models/taskmixin.py:
##########
@@ -236,14 +237,14 @@ def set_upstream(
         self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
 
     @property
-    def downstream_list(self) -> Iterable["DAGNode"]:
+    def downstream_list(self) -> Iterable["Operator"]:

Review Comment:
   Oh I don’t think you can do this… Many had tried this before and failed 🙂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #26188:
URL: https://github.com/apache/airflow/pull/26188#issuecomment-1239045570

   Curious, I wonder why that PR passed it's tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #26188:
URL: https://github.com/apache/airflow/pull/26188#discussion_r964441454


##########
airflow/models/taskmixin.py:
##########
@@ -236,14 +237,14 @@ def set_upstream(
         self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
 
     @property
-    def downstream_list(self) -> Iterable["DAGNode"]:
+    def downstream_list(self) -> Iterable["Operator"]:

Review Comment:
   Interesting, [get_task](https://github.com/apache/airflow/blob/a901ad93faf80f0f4399154ac52920f9da1316e7/airflow/models/dag.py#L2232) is apparently returning an Operator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26188:
URL: https://github.com/apache/airflow/pull/26188#discussion_r964369811


##########
airflow/www/views.py:
##########
@@ -597,15 +597,16 @@ def collect_edges(task_group):
     edges = set()
 
     def get_downstream(task):
-        tasks_to_trace = task.downstream_list
+        tasks_to_trace = {(task, frozenset(task.downstream_list))}
         while tasks_to_trace:
-            tasks_to_trace_next: Set[str] = set()
-            for child in tasks_to_trace:
-                edge = (task.task_id, child.task_id)
-                if edge in edges:
-                    continue
-                tasks_to_trace_next.update(child.downstream_list)
-                edges.add(edge)
+            tasks_to_trace_next: Set[tuple] = set()
+            for task, children in tasks_to_trace:

Review Comment:
   Does `children` change when we are iterating through? If not, we can simply do this (type hints omitted)
   
   ```python
   tasks_to_trace = [task]
   while tasks_to_trace:
       tasks_to_trace_next = []
       for task in tasks_to_trace:
           for child in task.downstream_list:
               edge = (task.task_id, child.task_id)
               if edge in edges:
                   continue
               edges.add(edge)
               tasks_to_trace_next.append(child)
       tasks_to_trace = tasks_to_trace_next
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26188:
URL: https://github.com/apache/airflow/pull/26188#issuecomment-1239362050

   That also help to track such cases where either the tests or the code have some implicit dependencies or testing unrelated code. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26188:
URL: https://github.com/apache/airflow/pull/26188#discussion_r964468630


##########
airflow/models/taskmixin.py:
##########
@@ -236,14 +237,14 @@ def set_upstream(
         self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
 
     @property
-    def downstream_list(self) -> Iterable["DAGNode"]:
+    def downstream_list(self) -> Iterable["Operator"]:

Review Comment:
   Yes but… well this thing is a mess.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26188:
URL: https://github.com/apache/airflow/pull/26188#discussion_r964439621


##########
airflow/www/views.py:
##########
@@ -596,21 +596,17 @@ def collect_edges(task_group):
     # Collect all the edges between individual tasks
     edges = set()
 
-    def get_downstream(task):
-        tasks_to_trace = {(task, frozenset(task.downstream_list))}
-        while tasks_to_trace:
-            tasks_to_trace_next: Set[tuple] = set()
-            for task, children in tasks_to_trace:
-                for child in children:
-                    edge = (task.task_id, child.task_id)
-                    if edge in edges:
-                        continue
-                    edges.add(edge)
-                    tasks_to_trace_next.add((child, frozenset(child.downstream_list)))
-            tasks_to_trace = tasks_to_trace_next
-
-    for root in dag.roots:
-        get_downstream(root)
+    tasks_to_trace: List[Operator] = dag.roots
+    while tasks_to_trace:
+        tasks_to_trace_next: List[Operator] = []
+        for task in tasks_to_trace:
+            for child in task.downstream_list:
+                edge = (task.task_id, child.task_id)

Review Comment:
   Use `node_id` here to avoid task and child needing to be an Operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #26188:
URL: https://github.com/apache/airflow/pull/26188#issuecomment-1239411382

   This is passing it's tests, if you want to fix up types @uranusjr we can do that in a follow up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #26188:
URL: https://github.com/apache/airflow/pull/26188#discussion_r964437252


##########
airflow/www/views.py:
##########
@@ -597,15 +597,16 @@ def collect_edges(task_group):
     edges = set()
 
     def get_downstream(task):
-        tasks_to_trace = task.downstream_list
+        tasks_to_trace = {(task, frozenset(task.downstream_list))}
         while tasks_to_trace:
-            tasks_to_trace_next: Set[str] = set()
-            for child in tasks_to_trace:
-                edge = (task.task_id, child.task_id)
-                if edge in edges:
-                    continue
-                tasks_to_trace_next.update(child.downstream_list)
-                edges.add(edge)
+            tasks_to_trace_next: Set[tuple] = set()
+            for task, children in tasks_to_trace:

Review Comment:
   I don't believe it does. Good call, I like that even better, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26188:
URL: https://github.com/apache/airflow/pull/26188#issuecomment-1239392671

   Fix (moving dag_edges and task_groups_to_dict in #26212. It depends on this one, so it should be merged after. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #26188:
URL: https://github.com/apache/airflow/pull/26188#issuecomment-1239409142

   Ahhhh yeah that makes sense. The price we pay for having quicker tests is we sometimes miss things like this. (Totally worth it!) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #26188:
URL: https://github.com/apache/airflow/pull/26188#discussion_r964530820


##########
airflow/models/taskmixin.py:
##########
@@ -236,14 +237,14 @@ def set_upstream(
         self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
 
     @property
-    def downstream_list(self) -> Iterable["DAGNode"]:
+    def downstream_list(self) -> Iterable["Operator"]:

Review Comment:
   @uranusjr Does this need changing or is it fine as it is? (It's passing type checks)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26188:
URL: https://github.com/apache/airflow/pull/26188#issuecomment-1239360748

   @ashb Because the change was only in "www" and "Core" tests" were skipped with "Selective checks" because of that. Apparently the "core tests" as defined now have some implicit dependency on www code. I will trace it and fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26188:
URL: https://github.com/apache/airflow/pull/26188#issuecomment-1239371520

   Yes. In this case there are two solutions (the first is way worse):
   
   * `test_dag_renderer" could be moved to "tests/www" becasue it mostly tests "dag_edges" which is  airflow/www/views.py 
   
   or (probably way better solution)
   
   * `dag_edges` (and corresponding methods) should be moved to "airlfow/utils" because they are used by more than just views (they are used by both Views and Dot Renderer
   
   I am happy to add PR to do the latter - in this case, this failure simply uncovered an implicit dependency that we missed that should be removed.
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb merged pull request #26188: Fix edge detection iteration

Posted by GitBox <gi...@apache.org>.
ashb merged PR #26188:
URL: https://github.com/apache/airflow/pull/26188


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org