You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/19 18:58:29 UTC

[GitHub] [airflow] Gollum999 opened a new issue, #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Gollum999 opened a new issue, #25165:
URL: https://github.com/apache/airflow/issues/25165

   ### Apache Airflow version
   
   2.3.3 (latest released)
   
   ### What happened
   
   As the title states, if you have dynamically mapped tasks inside of a `TaskGroup`, those tasks do not get the `group_id` prepended to their respective `task_id`s.  This causes at least a couple of undesirable side effects:
   
   1. Task names are truncated in Grid/Graph* View.  The tasks below are named `plus_one` and `plus_two`:
   
   ![Screenshot from 2022-07-19 13-29-05](https://user-images.githubusercontent.com/7269927/179826453-a4293c14-2a83-4739-acf2-8b378e4e85e9.png)
   ![Screenshot from 2022-07-19 13-47-47](https://user-images.githubusercontent.com/7269927/179826442-b9e3d24d-52ff-49fc-a8cc-fe1cb5143bcb.png)
   
   Presumably this is because the UI normally strips off the `group_id` prefix.
   
   \* Graph View was very inconsistent in my experience.  Sometimes the names are truncated, and sometimes they render correctly.  I haven't figured out the pattern behind this behavior.
   
   2. Duplicate `task_id`s between groups result in a `airflow.exceptions.DuplicateTaskIdFound`, even if the `group_id` would normally disambiguate them.
   
   
   ### What you think should happen instead
   
   These dynamic tasks inside of a group should have the `group_id` prepended for consistent behavior.
   
   ### How to reproduce
   
   ```
   #!/usr/bin/env python3
   import datetime
   
   from airflow.decorators import dag, task
   from airflow.utils.task_group import TaskGroup
   
   
   @dag(
       start_date=datetime.datetime(2022, 7, 19),
       schedule_interval=None,
   )
   def test_dag():
       with TaskGroup(group_id='group'):
           @task
           def plus_one(x: int):
               return x + 1
   
           plus_one.expand(x=[1, 2, 3])
   
       with TaskGroup(group_id='ggg'):
           @task
           def plus_two(x: int):
               return x + 2
   
           plus_two.expand(x=[1, 2, 3])
   
   
   dag = test_dag()
   
   
   if __name__ == '__main__':
       dag.cli()
   ```
   
   ### Operating System
   
   CentOS Stream 8
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Standalone
   
   ### Anything else
   
   Possibly related: #12309
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1208336591

   @EdwardRadical Will your change fix just the id or the label too? If it's both then I say go for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] EdwardRadical commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
EdwardRadical commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1201848083

   Same core issue as https://github.com/apache/airflow/issues/23621.
   
   >This logic already exists; a @task-decorated task calls get_unique_task_id, which calls task_group.child_id. The root cause will be deeper than this.
   
   As far as I can tell this is not correct, this is the reason I found back when I opened it:
   
   > At the end of the day it seems that 
   > ```
   >         if task_group:
   >             self.task_id = task_group.child_id(task_id)
   > ```
   > Is only executed by inheritors of `BaseOperator`. `MappedOperator` does not inherit from it so it never does execute the line that would prefix correctly the task_id, hence the duplicate key error thrown later, when `self.dag.add_task` is called. Maybe a simple fix would be to add those two lines to `MappedOperator` too, although there are other places that fiddle with the task_id.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] erdos2n commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
erdos2n commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1191963111

   Can I take this @bbovenzi ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr closed issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
uranusjr closed issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id
URL: https://github.com/apache/airflow/issues/25165


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1202029181

   MappedOperator does not call the function on its own, but receives the converted `task_id` from either when the BaseOperator calls `partial` or `expand`. Search for `child_id` and `get_unique_task_id` in the code base, and you can find them called when a task is expanded.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1191967441

   @erdos2n go for it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1203153953

   @andreafromcape - since you understand how things work, I urge you to provide PR fixing the problem and unit test testing it. This will make far more productive discussion, when you go through all the deep dive there and check exactly how it works,. I am not saying anything said here is correct/not correct, but in any case you seem to be smart and knowledgeable, so the best way to show it (and help everyone) is to provide a fix. Airlfow is created by > 2100 contribiutors, and you can become one if you think you can help others.
   
   I think this will be far more productive than pointing out who is wrong or right.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Gollum999 commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
Gollum999 commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1191470152

   I appreciate the fast response, but could we re-open this?  #25217 only fixes one of the three symptoms that I mentioned above.
   
   I think the "root" fix is to prepend the `group_id` to `MappedOperator.task_id`, like what `BaseOperator` does:
   ```
   if task_group:
       self.task_id = task_group.child_id(task_id)
   else:
       self.task_id = task_id
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] EdwardRadical commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
EdwardRadical commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1203698383

   No, @uranusjr, there is more to it. This is the complete snippet:
   
   ```
       task_group = task_group or TaskGroupContext.get_current_task_group(dag)
       tg_task_id = task_group.child_id(task_id) if task_group else task_id
   
       if tg_task_id not in dag.task_ids:
           return task_id
   ```
   
   This happens at the time `expand` is called: albeit `tg_task_id` indeed contains the correct label, it is ignored because `tg_task_id not in dag.task_ids` evaluates to `True` (for obvious reasons), so the naked `task_id` is returned by `get_unique_task_id`.
   
   And this was probably a rough two-liners to fix the obvious collision [with this other snippet](https://github.com/apache/airflow/blob/3138604b264878f27505223bd14c7814eacc1e57/airflow/models/baseoperator.py#L759-L762) inside `BaseOperator`'s `__init__` which calls the exact same function (`task_group.child_id(task_id)`):
   
   ```
           if task_group:
               self.task_id = task_group.child_id(task_id)
           else:
               self.task_id = task_id
   ```
   
   So while `get_unique_task_id` does **_not_** return an unique task_id in case the task belongs to a group, for most use cases the task_id is fixed later by `BaseOperator`, but the snippet above is not contained in `MappedOperator`'s `__init__`, and `MappedOperator` does not inherit from `BaseOperator` either so it's not going to run the `__init__` from it - and as a consequence of these, the result will be a duplicate.
   
   The reason why I am explaining this to you, @uranusjr, albeit I know where the issue is, I know how it manifests, and I can maybe patch it, is that I recognize I am not the one with the most knowledge of the environment here and any fix I can create will most likely not be in the spirit of the architecture, as a major contributor to AIP-42, _you_ are that guy with expert knowledge: so I can continue to convince you for a couple messages, give up, and create a PR out of my flawed expert knowledge, or in the spirit of OSS you can hear me out and I can find how to fix it best without introducing dangerous assumptions or unsound logic. 
   
   Given how the bug seem to happen, this is what it would need to fix the issue, to the best of my knowledge:
   
   ```
   ===================================================================
   diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py
   --- a/airflow/decorators/base.py	(revision 298be502c35006b7c3f011b676dbb4db0633bc74)
   +++ b/airflow/decorators/base.py	(date 1659517989535)
   @@ -359,6 +359,9 @@
            partial_kwargs.update(task_kwargs)
    
            task_id = get_unique_task_id(partial_kwargs.pop("task_id"), dag, task_group)
   +        if task_group:
   +            task_id = task_group.child_id(task_id)
   +
            params = partial_kwargs.pop("params", None) or default_params
    
            # Logic here should be kept in sync with BaseOperatorMeta.partial().
   ```
   
   Let me know what you think of it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1191359768

   Ok I found the issue. If the groups are expanded on the initial page load the labels are correct. If a user clicks to expand a group the labels are not being updated. I'm creating a PR to fix this now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi closed issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
bbovenzi closed issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id
URL: https://github.com/apache/airflow/issues/25165


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] erdos2n commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
erdos2n commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1191975901

   I don't know if I can assign myself...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1192167588

   > I think the "root" fix is to prepend the group_id to MappedOperator.task_id, like what BaseOperator does
   
   This logic already exists; a `@task`-decorated task calls `get_unique_task_id`, which calls `task_group.child_id`. The root cause will be deeper than this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] andreafromcape commented on issue #25165: Dynamic Tasks inside of TaskGroup do not have group_id prepended to task_id

Posted by GitBox <gi...@apache.org>.
andreafromcape commented on issue #25165:
URL: https://github.com/apache/airflow/issues/25165#issuecomment-1202350973

   No. I already went through this once with you in the issue I referenced but I must assume I was completely ignored. This is the complete snippet:
   
   ```
       task_group = task_group or TaskGroupContext.get_current_task_group(dag)
       tg_task_id = task_group.child_id(task_id) if task_group else task_id
   
       if tg_task_id not in dag.task_ids:
           return task_id
   ```
   
   This happens at the time `expand` is called: albeit `tg_task_id` indeed contains the correct label, it is ignored because `tg_task_id not in dag.task_ids` evaluates to `True` (for obvious reasons, I hope), so the naked `task_id` is returned by `get_unique_task_id`.
   
   And again as I said in the referenced issue, this was probably a rough two-liners to fix the obvious collision [with this other snippet](https://github.com/apache/airflow/blob/3138604b264878f27505223bd14c7814eacc1e57/airflow/models/baseoperator.py#L759-L762) inside `BaseOperator`'s `__init__` which does the exact same thing (prepending the task_group_id to the task_id):
   
   ```
           if task_group:
               self.task_id = task_group.child_id(task_id)
           else:
               self.task_id = task_id
   ```
   
   So while `get_unique_task_id` does **_not_** return an unique task_id in some cases, for most use cases it is fixed later by `BaseOperator`, but the fix above is not contained in `MappedOperator`'s `__init__`, and `MappedOperator` does not inherit from `BaseOperator` either so it's not going to fix the task_id and it's going to result in a duplicate.
   
   So, to summarize:
   
   - `get_unique_task_id` does _not_ return an unique task_id in case a task is inside a task group
   - This is normally fixed later by `BaseOperator` at initiation which will correctly prepend the task_group_id
   - **_But_** MappedOperator does not fix the task_id as `BaseOperator` does
   - Hence, we have a duplicate task_id in case the `@task` decorator is used inside a task_group
   
   I urge you to re-read the code, @uranusjr, because it does _not_ do what you think it does.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org