You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/10 06:04:33 UTC

[GitHub] [airflow] VBhojawala opened a new pull request #12228: Taskgroup decorator

VBhojawala opened a new pull request #12228:
URL: https://github.com/apache/airflow/pull/12228


   Added taskgroup decorator which can be used to create taskgroup from python callable. Inside python callable tasks can be grouped by calling tasks (python callable ) created with task decorator.
   
   - taskgroup decorator takes both optional and keyword argument which are passed to Constructor of TaskGroup class.
   
   - TaskGroup can be created with one of following syntax
   
   ``` python
   @taskgroup
   @taskgroup()
   @taskgroup(group_id='group_name')
   ```
   
   - TaskGroup class  constructor takes one mandatory argument group_id, if not given in decorator it sets group_id to python callable name.
   
   Following is a simple example demonstrating use of taskgroup decorator grouping multiple tasks.
   
   ``` python
   @task
   def task_1(value):
     return f'[ Task1 {value} ]'
   
   
   @task
   def task_2(value):
     print(f'[ Task2 {value} ]')
   
   
   @taskgroup
   def section_1(value):
     return task_2(task_1(value))
   ```
   
   taskgroup decorator utilizes existing TaskGroup context manager currently used for creating TaskGroup, which means we can create nested taskgroup by created nested callable. Following is an example demonstrating use of nested taskgroup.
   
   ``` python
   @task
   def task_start():
       return '[Task_start]'
   
   @task
   def task_end():
       print(f'[ Task_End ]')
   
   @task
   def task_1(value):
       return f'[ Task1 {value} ]'
   
   @task
   def task_2(value):
       print(f'[ Task2 {value} ]')
   
   @task
   def task_3(value):
       return f'[ Task3 {value} ]'
   
   @task
   def task_4(value):
       print(f'[ Task4 {value} ]')
   
   @taskgroup
   def section_1(value):
   
       @taskgroup
       def section_2(value2):
           return task_4(task_3(value2))
   
       op1 = task_2(task_1(value))
       return section_2(op1) 
   ```
   
   Dedicated test cases for taskgroup decorator is created in file 
   /tests/utils/test_task_group_decorator.py
   
   closes: #11870
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-749004546


   Does this actually need to be a decorator? How about this:
   
   ```python
   @task
   collect_dataset(...)
     ...
   
   @task
   def train_model(...)
      ...
   
   @task_group
   def movielens_model_pipeline(learning_rate: int, feature_list: list, dataset: XComArg):  
     dataset = filter_dataset(dataset, feature_list)
     train_model(dataset, learning_rate)
   
   @dag
   def movielens_hpt(dataset_path: str, feature_list:list=['year', 'director']):
     dataset = load_dataset(dataset_path)
   
     # NEW HERE
     with dag.taskgroup():
        for i in range(0.1, 0.9, step=0.1):
          movielens_model_pipeline(i, feature_list, dataset)
     
     decide_best_model(...)
   ```
   
   Or nested groups:
   
   ```python
   with dag.taskgroup() as group
     with group.taskgroup(name="Inner group"):
        for i in range(0.1, 0.9, step=0.1):
          movielens_model_pipeline(i, feature_list, dataset)
   ```
   
   What do you think @VBhojawala @casassg ?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-724487412


   [The Workflow run](https://github.com/apache/airflow/actions/runs/355332680) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-724525631


   [The Workflow run](https://github.com/apache/airflow/actions/runs/355424527) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] casassg commented on a change in pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
casassg commented on a change in pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#discussion_r547475486



##########
File path: airflow/utils/task_group.py
##########
@@ -344,3 +357,52 @@ def get_current_task_group(cls, dag: Optional["DAG"]) -> Optional[TaskGroup]:
                 return dag.task_group
 
         return cls._context_managed_task_group
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def taskgroup(python_callable: Optional[Callable] = None, *tg_args, **tg_kwargs) -> Callable[[T], T]:
+    """
+    Python TaskGroup decorator. Wraps a function into an Airflow TaskGroup.
+    Accepts kwargs for operator TaskGroup. Can be used to parametrize TaskGroup.
+
+    :param python_callable: Function to decorate
+    :param tg_args: Arguments for TaskGroup object
+    :type tg_args: list
+    :param tg_kwargs: Kwargs for TaskGroup object.
+    :type tg_kwargs: dict
+    """
+
+    def wrapper(f: T):
+        # Setting group_id as function name if not given
+        if len(tg_args) == 0 and 'group_id' not in tg_kwargs.keys():
+            tg_kwargs['group_id'] = f.__name__
+
+        # Get dag initializer signature and bind it to validate that task_group_args,
+        # and task_group_kwargs are correct
+        task_group_sig = signature(TaskGroup.__init__)
+        task_group_bound_args = task_group_sig.bind_partial(*tg_args, **tg_kwargs)
+
+        @functools.wraps(f)
+        def factory(*args, **kwargs):

Review comment:
       May be nice to provide a specific kwarg for overwriting the group_id

##########
File path: airflow/utils/task_group.py
##########
@@ -344,3 +357,52 @@ def get_current_task_group(cls, dag: Optional["DAG"]) -> Optional[TaskGroup]:
                 return dag.task_group
 
         return cls._context_managed_task_group
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def taskgroup(python_callable: Optional[Callable] = None, *tg_args, **tg_kwargs) -> Callable[[T], T]:
+    """
+    Python TaskGroup decorator. Wraps a function into an Airflow TaskGroup.
+    Accepts kwargs for operator TaskGroup. Can be used to parametrize TaskGroup.
+
+    :param python_callable: Function to decorate
+    :param tg_args: Arguments for TaskGroup object
+    :type tg_args: list
+    :param tg_kwargs: Kwargs for TaskGroup object.
+    :type tg_kwargs: dict
+    """
+
+    def wrapper(f: T):
+        # Setting group_id as function name if not given
+        if len(tg_args) == 0 and 'group_id' not in tg_kwargs.keys():
+            tg_kwargs['group_id'] = f.__name__

Review comment:
       Can we add logic to autogenerate group_id if it already exists on the DAG? 
   
   A taskgroup decorated function can be invoked multiple times, this should generate multiple taskgroups. 
   
   We can follow a similar approach than @task decorator where we append `__1` to the end. Probs we can reuse the same regex.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#discussion_r520462889



##########
File path: tests/utils/test_task_group_decorator.py
##########
@@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+from airflow.models.dag import DAG
+from airflow.operators.python import PythonOperator, task
+from airflow.utils.task_group import taskgroup
+from airflow.www.views import task_group_to_dict
+
+
+def test_build_task_group_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be created using
+    TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print(f'[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t1 = task_start()
+        s1 = section_1(t1)
+        s1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    def extract_node_id(node, include_label=False):
+        ret = {"id": node["id"]}
+        if include_label:
+            ret["label"] = node["value"]["label"]
+        if "children" in node:
+            children = []
+            for child in node["children"]:
+                children.append(extract_node_id(child, include_label=include_label))
+
+            ret["children"] = children
+        return ret
+
+    # Node IDs test
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.task_4'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_build_task_group_with_operators():
+    """  Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator """
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print(f'[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup(group_id='section_1')
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+        s1 = section_1(t_start.output)
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        s1.set_downstream(t_end)
+
+    # Testing Tasks ing DAG
+    assert set(dag.task_group.children.keys()) == {'section_1', 'task_start', 'task_end'}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_2',
+        'section_1.task_3',
+        'section_1.task_1',
+    }
+
+    # Testing Tasks downstream
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}

Review comment:
       I made suggested changes and added a new Dag for testing  TaskGroup context manager (section1) with nested TaskGroup decorator (section2).
   ![Screenshot from 2020-11-10 15-29-47](https://user-images.githubusercontent.com/11897651/98663662-5fc4b100-236f-11eb-94e4-924924c928e8.png)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-749183064


   Yeah.
   
   Thinking more about the decorator form:
   
   ```
   @taskgroup
   def section(value):
   ```
   
   For that to need to be a function, it probably needs to be able to override the TG name i.e. this usage:
   
   ```python
       for i in ...:
           section(name=f'section_{i}')
   ```
   
   (We could of course have the same behaviour as for tasks, where we auto-apply a `__1` suffix etc. This could be what we do, I haven't look at the code PR yet, sorry!)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-724629764


   [The Workflow run](https://github.com/apache/airflow/actions/runs/355702632) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-749145135


   I think my example would do that too.
   
   I haven't explored if it's possible, but it should be - to me task groups didn't feel like a decorator, but more of a context manager.
   
   We could do both if we want


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#discussion_r520376963



##########
File path: tests/utils/test_task_group_decorator.py
##########
@@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+from airflow.models.dag import DAG
+from airflow.operators.python import PythonOperator, task
+from airflow.utils.task_group import taskgroup
+from airflow.www.views import task_group_to_dict
+
+
+def test_build_task_group_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be created using
+    TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print(f'[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t1 = task_start()
+        s1 = section_1(t1)
+        s1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    def extract_node_id(node, include_label=False):

Review comment:
       Ok, i will move it to test_task_group.py




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] casassg commented on pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
casassg commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-749135819


   @ashb Ideally you get a new task group for every invocation of `movielens_model_pipeline` in order to get it grouped in the UI. Not sure how this relates to the examples. I see nested taskgroups in the example you mention. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#discussion_r520375737



##########
File path: airflow/utils/task_group.py
##########
@@ -344,3 +346,51 @@ def get_current_task_group(cls, dag: Optional["DAG"]) -> Optional[TaskGroup]:
                 return dag.task_group
 
         return cls._context_managed_task_group
+
+
+def taskgroup(python_callable: Optional[Callable] = None, *tg_args, **tg_kwargs) -> Callable[[T], T]:
+    """
+    Python TaskGroup decorator. Wraps a function into an Airflow TaskGroup.
+    Accepts kwargs for operator TaskGroup. Can be used to parametrize TaskGroup.
+
+    :param python_callable: Function to decorate
+    :param tg_args: Arguments for TaskGroup object
+    :type tg_args: list
+    :param tg_kwargs: Kwargs for TaskGroup object.
+    :type tg_kwargs: dict
+    """
+
+    def wrapper(f: T):
+        # Setting group_id as function name if not given
+        if len(tg_args) == 0 and 'group_id' not in tg_kwargs.keys():
+            tg_kwargs['group_id'] = f.__name__
+
+        # Get dag initializer signature and bind it to validate that task_group_args,
+        # and task_group_kwargs are correct
+        task_group_sig = signature(TaskGroup.__init__)
+        task_group_bound_args = task_group_sig.bind_partial(*tg_args, **tg_kwargs)
+
+        @functools.wraps(f)
+        def factory(*args, **kwargs):
+            # Generate signature for decorated function and bind the arguments when called
+            # we do this to extract parameters so we can annotate them on the DAG object.
+            # In addition, this fails if we are missing any args/kwargs with TypeError as expected.
+            f_sig = signature(f).bind(*args, **kwargs)
+            # Apply defaults to capture default values if set.
+            f_sig.apply_defaults()
+
+            # Initialize TaskGroup with bound arguments
+            with TaskGroup(*task_group_bound_args.args, **task_group_bound_args.kwargs) as tg_obj:
+                # Invoke function to run Tasks inside the TaskGroup
+                f(**f_sig.arguments)
+
+            # Return task_group object such that it's accessible in Globals.
+            return tg_obj
+
+        return cast(T, factory)
+
+    if callable(python_callable):

Review comment:
       Yes, i will remove elif part.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-724544478


   > Thanks for the updates.
   > 
   > By the way, you don't have to open a new PR to make these changes. When you do this the original history of the comments are left on the old PR.
   
   Earlier I raised pull request from Fork. In order to work on multiple issue simultaneously i closed old PR and raised from branch of fork. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] casassg commented on pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
casassg commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-749146353


   You mean something like:
   
   ```
   @dag
   def movielens_hpt(dataset_path: str, feature_list:list=['year', 'director']):
     dataset = load_dataset(dataset_path)
   
     # NEW HERE
     for i in range(0.1, 0.9, step=0.1):
       with dag.taskgroup(f'pipeline_{i}'):
          movielens_model_pipeline(i, feature_list, dataset)
     
     decide_best_model(...)
   ```
   
   I think this could work. I think it would be fun to have it be generated automatically for you, but agree that both approaches make sense.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-724527398


   [The Workflow run](https://github.com/apache/airflow/actions/runs/355427958) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-754387934


   oops, re-raising pull request.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-724681162


   [The Workflow run](https://github.com/apache/airflow/actions/runs/355860124) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#discussion_r520353635



##########
File path: tests/utils/test_task_group_decorator.py
##########
@@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+from airflow.models.dag import DAG
+from airflow.operators.python import PythonOperator, task
+from airflow.utils.task_group import taskgroup
+from airflow.www.views import task_group_to_dict
+
+
+def test_build_task_group_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be created using
+    TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print(f'[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t1 = task_start()
+        s1 = section_1(t1)
+        s1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    def extract_node_id(node, include_label=False):

Review comment:
       I suggest putting these tests inside the existing `test_task_group.py` instead so that things like `extract_node_id` can be reused. It also makes sense because they are closely related.

##########
File path: airflow/utils/task_group.py
##########
@@ -344,3 +346,51 @@ def get_current_task_group(cls, dag: Optional["DAG"]) -> Optional[TaskGroup]:
                 return dag.task_group
 
         return cls._context_managed_task_group
+
+
+def taskgroup(python_callable: Optional[Callable] = None, *tg_args, **tg_kwargs) -> Callable[[T], T]:
+    """
+    Python TaskGroup decorator. Wraps a function into an Airflow TaskGroup.
+    Accepts kwargs for operator TaskGroup. Can be used to parametrize TaskGroup.
+
+    :param python_callable: Function to decorate
+    :param tg_args: Arguments for TaskGroup object
+    :type tg_args: list
+    :param tg_kwargs: Kwargs for TaskGroup object.
+    :type tg_kwargs: dict
+    """
+
+    def wrapper(f: T):
+        # Setting group_id as function name if not given
+        if len(tg_args) == 0 and 'group_id' not in tg_kwargs.keys():
+            tg_kwargs['group_id'] = f.__name__
+
+        # Get dag initializer signature and bind it to validate that task_group_args,
+        # and task_group_kwargs are correct
+        task_group_sig = signature(TaskGroup.__init__)
+        task_group_bound_args = task_group_sig.bind_partial(*tg_args, **tg_kwargs)
+
+        @functools.wraps(f)
+        def factory(*args, **kwargs):
+            # Generate signature for decorated function and bind the arguments when called
+            # we do this to extract parameters so we can annotate them on the DAG object.
+            # In addition, this fails if we are missing any args/kwargs with TypeError as expected.
+            f_sig = signature(f).bind(*args, **kwargs)
+            # Apply defaults to capture default values if set.
+            f_sig.apply_defaults()
+
+            # Initialize TaskGroup with bound arguments
+            with TaskGroup(*task_group_bound_args.args, **task_group_bound_args.kwargs) as tg_obj:
+                # Invoke function to run Tasks inside the TaskGroup
+                f(**f_sig.arguments)
+
+            # Return task_group object such that it's accessible in Globals.
+            return tg_obj
+
+        return cast(T, factory)
+
+    if callable(python_callable):

Review comment:
       I think this is the same as providing just a `tg_kwargs`? If there's no `*tg_args`, the user can't put in positional arguments after the callable. You shouldn't need to explicitly raise an `AirflowException` for this.
   
   ```python
   def taskgroup(python_callable: Optional[Callable], **tg_kwargs)
   ```

##########
File path: tests/utils/test_task_group_decorator.py
##########
@@ -0,0 +1,189 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+from airflow.models.dag import DAG
+from airflow.operators.python import PythonOperator, task
+from airflow.utils.task_group import taskgroup
+from airflow.www.views import task_group_to_dict
+
+
+def test_build_task_group_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be created using
+    TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print(f'[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t1 = task_start()
+        s1 = section_1(t1)
+        s1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    def extract_node_id(node, include_label=False):
+        ret = {"id": node["id"]}
+        if include_label:
+            ret["label"] = node["value"]["label"]
+        if "children" in node:
+            children = []
+            for child in node["children"]:
+                children.append(extract_node_id(child, include_label=include_label))
+
+            ret["children"] = children
+        return ret
+
+    # Node IDs test
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.task_4'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_build_task_group_with_operators():
+    """  Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator """
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print(f'[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup(group_id='section_1')
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+        s1 = section_1(t_start.output)
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        s1.set_downstream(t_end)
+
+    # Testing Tasks ing DAG
+    assert set(dag.task_group.children.keys()) == {'section_1', 'task_start', 'task_end'}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_2',
+        'section_1.task_3',
+        'section_1.task_1',
+    }
+
+    # Testing Tasks downstream
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}

Review comment:
       Maybe also mix `TaskGroup` context manager and decorator in the same DAG to test if they play nice together?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#discussion_r521103735



##########
File path: tests/utils/test_task_group.py
##########
@@ -559,3 +560,239 @@ def test_duplicate_group_id():
             _ = DummyOperator(task_id="task1")
             with TaskGroup("group1"):
                 _ = DummyOperator(task_id="upstream_join_id")
+
+
+# taskgroup decorator tests
+
+
+def test_build_task_group_deco_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be
+    created using TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t_start = task_start()
+        sec_1 = section_1(t_start)
+        sec_1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    # Node IDs test
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.task_4'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_build_task_group_with_operators():
+    """  Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator """
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup(group_id='section_1')

Review comment:
       Thank you for the suggestions, i have added taskgroup to ``decorators.py`` and updated test case as suggested.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] casassg commented on pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
casassg commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-754066190


   I believe you may have rebased on the wrong commit, seems the PR now includes changes from upstream.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#discussion_r550747918



##########
File path: airflow/utils/task_group.py
##########
@@ -344,3 +357,52 @@ def get_current_task_group(cls, dag: Optional["DAG"]) -> Optional[TaskGroup]:
                 return dag.task_group
 
         return cls._context_managed_task_group
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def taskgroup(python_callable: Optional[Callable] = None, *tg_args, **tg_kwargs) -> Callable[[T], T]:
+    """
+    Python TaskGroup decorator. Wraps a function into an Airflow TaskGroup.
+    Accepts kwargs for operator TaskGroup. Can be used to parametrize TaskGroup.
+
+    :param python_callable: Function to decorate
+    :param tg_args: Arguments for TaskGroup object
+    :type tg_args: list
+    :param tg_kwargs: Kwargs for TaskGroup object.
+    :type tg_kwargs: dict
+    """
+
+    def wrapper(f: T):
+        # Setting group_id as function name if not given
+        if len(tg_args) == 0 and 'group_id' not in tg_kwargs.keys():
+            tg_kwargs['group_id'] = f.__name__

Review comment:
       Sure, can you guide me how to get list of existing group_id inside taskgroup decorator?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-753319162


   [The Workflow run](https://github.com/apache/airflow/actions/runs/456211152) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-724762482


   I think label attached by bot  ``area:Docs``  is not accurate for this PR. can it be corrected?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] casassg commented on a change in pull request #12228: taskgroup decorator

Posted by GitBox <gi...@apache.org>.
casassg commented on a change in pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#discussion_r520874186



##########
File path: tests/utils/test_task_group.py
##########
@@ -559,3 +560,239 @@ def test_duplicate_group_id():
             _ = DummyOperator(task_id="task1")
             with TaskGroup("group1"):
                 _ = DummyOperator(task_id="upstream_join_id")
+
+
+# taskgroup decorator tests
+
+
+def test_build_task_group_deco_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be
+    created using TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t_start = task_start()
+        sec_1 = section_1(t_start)
+        sec_1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    # Node IDs test
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.task_4'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_build_task_group_with_operators():
+    """  Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator """
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup(group_id='section_1')

Review comment:
       Maybe change name of group_id or function to make sure overwrite is working properly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#discussion_r550767798



##########
File path: airflow/utils/task_group.py
##########
@@ -344,3 +357,52 @@ def get_current_task_group(cls, dag: Optional["DAG"]) -> Optional[TaskGroup]:
                 return dag.task_group
 
         return cls._context_managed_task_group
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def taskgroup(python_callable: Optional[Callable] = None, *tg_args, **tg_kwargs) -> Callable[[T], T]:
+    """
+    Python TaskGroup decorator. Wraps a function into an Airflow TaskGroup.
+    Accepts kwargs for operator TaskGroup. Can be used to parametrize TaskGroup.
+
+    :param python_callable: Function to decorate
+    :param tg_args: Arguments for TaskGroup object
+    :type tg_args: list
+    :param tg_kwargs: Kwargs for TaskGroup object.
+    :type tg_kwargs: dict
+    """
+
+    def wrapper(f: T):
+        # Setting group_id as function name if not given
+        if len(tg_args) == 0 and 'group_id' not in tg_kwargs.keys():
+            tg_kwargs['group_id'] = f.__name__

Review comment:
       I have added auto generate group_id if it already exists  same as task. and also added test case for it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] casassg edited a comment on pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
casassg edited a comment on pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#issuecomment-749146353


   You mean something like:
   
   ```py
   @dag
   def movielens_hpt(dataset_path: str, feature_list:list=['year', 'director']):
     dataset = load_dataset(dataset_path)
   
     # NEW HERE
     for i in range(0.1, 0.9, step=0.1):
       with dag.taskgroup(f'pipeline_{i}'):
          movielens_model_pipeline(i, feature_list, dataset)
     
     decide_best_model(...)
   ```
   
   I think this could work. I think it would be fun to have it be generated automatically for you, but agree that both approaches make sense.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #12228:
URL: https://github.com/apache/airflow/pull/12228#discussion_r550748169



##########
File path: airflow/utils/task_group.py
##########
@@ -344,3 +357,52 @@ def get_current_task_group(cls, dag: Optional["DAG"]) -> Optional[TaskGroup]:
                 return dag.task_group
 
         return cls._context_managed_task_group
+
+
+T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
+
+
+def taskgroup(python_callable: Optional[Callable] = None, *tg_args, **tg_kwargs) -> Callable[[T], T]:
+    """
+    Python TaskGroup decorator. Wraps a function into an Airflow TaskGroup.
+    Accepts kwargs for operator TaskGroup. Can be used to parametrize TaskGroup.
+
+    :param python_callable: Function to decorate
+    :param tg_args: Arguments for TaskGroup object
+    :type tg_args: list
+    :param tg_kwargs: Kwargs for TaskGroup object.
+    :type tg_kwargs: dict
+    """
+
+    def wrapper(f: T):
+        # Setting group_id as function name if not given
+        if len(tg_args) == 0 and 'group_id' not in tg_kwargs.keys():
+            tg_kwargs['group_id'] = f.__name__
+
+        # Get dag initializer signature and bind it to validate that task_group_args,
+        # and task_group_kwargs are correct
+        task_group_sig = signature(TaskGroup.__init__)
+        task_group_bound_args = task_group_sig.bind_partial(*tg_args, **tg_kwargs)
+
+        @functools.wraps(f)
+        def factory(*args, **kwargs):

Review comment:
       Currently for TaskGroup constructor group_id is by default first positional argument or it can be keyword argument.
   Same can be done with current implementation of taskgroup decorator.
   
   Are you suggesting to place group_id as optional keyword argument in decorator signature?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala closed pull request #12228: taskgroup decorator (#11870)

Posted by GitBox <gi...@apache.org>.
VBhojawala closed pull request #12228:
URL: https://github.com/apache/airflow/pull/12228


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org