You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/05 05:14:38 UTC

[GitHub] [airflow] VBhojawala opened a new pull request #13479: Added taskgroup decorator

VBhojawala opened a new pull request #13479:
URL: https://github.com/apache/airflow/pull/13479


   Added taskgroup decorator which can be used to create taskgroup from python callable. Inside python callable tasks can be grouped by calling tasks (python callable ) created with task decorator.
   
   
   - taskgroup decorator takes both optional and keyword argument which are passed to Constructor of TaskGroup class.
   
   - TaskGroup can be created with one of following syntax
   
   ``` python
   @taskgroup
   @taskgroup()
   @taskgroup(group_id='group_name')
   ```
   
   - TaskGroup class  constructor takes one mandatory argument group_id, if not given in decorator it sets group_id to python callable name.
   
   Following is a simple example demonstrating use of taskgroup decorator grouping multiple tasks.
   
   ``` python
   @task
   def task_1(value):
     return f'[ Task1 {value} ]'
   
   
   @task
   def task_2(value):
     print(f'[ Task2 {value} ]')
   
   
   @taskgroup
   def section_1(value):
     return task_2(task_1(value))
   ```
   
   taskgroup decorator utilizes existing TaskGroup context manager currently used for creating TaskGroup, which means we can create nested taskgroup by created nested callable. Following is an example demonstrating use of nested taskgroup.
   
   ``` python
   @task
   def task_start():
       return '[Task_start]'
   
   @task
   def task_end():
       print(f'[ Task_End ]')
   
   @task
   def task_1(value):
       return f'[ Task1 {value} ]'
   
   @task
   def task_2(value):
       print(f'[ Task2 {value} ]')
   
   @task
   def task_3(value):
       return f'[ Task3 {value} ]'
   
   @task
   def task_4(value):
       print(f'[ Task4 {value} ]')
   
   @taskgroup
   def section_1(value):
   
       @taskgroup
       def section_2(value2):
           return task_4(task_3(value2))
   
       op1 = task_2(task_1(value))
       return section_2(op1) 
   ```
   
   Dedicated test cases for taskgroup decorator is created in file 
   /tests/utils/test_task_group_decorator.py
   
   
   Recent changes 
   
   - Added logic to append suffix to duplicate group_id.
   
   closes: #11870
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r575665958



##########
File path: docs/apache-airflow/concepts.rst
##########
@@ -1121,6 +1121,26 @@ This animated gif shows the UI interactions. TaskGroups are expanded or collapse
 .. image:: img/task_group.gif
 
 
+TaskGroup can be created using ``@taskgroup decorator``, it takes one mandatory argument ``group_id`` which is same as constructor of TaskGroup class, if not given it copies function name as ``group_id``. It works exactly same as creating TaskGroup using context manager ``with TaskGroup('groupid') as section:``.
+
+.. code-block:: python
+
+  @task
+  def task_1(value):
+      return f'[ Task1 {value} ]'
+
+
+  @task
+  def task_2(value):
+      print(f'[ Task2 {value} ]')
+
+
+  @taskgroup
+  def section_1(value):
+      return task_2(task_1(value))

Review comment:
       Should we use the `exampleinclude` to use code from example DAg instead of hardcoding it in docs?
   
   ```
   .. exampleinclude:: /../../airflow/providers/apache/beam/example_dags/example_beam.py
       :language: python
       :dedent: 4
       :start-after: [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
       :end-before: [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r560200369



##########
File path: airflow/utils/task_group.py
##########
@@ -42,7 +56,7 @@ class TaskGroup(TaskMixin):
     :param prefix_group_id: If set to True, child task_id and group_id will be prefixed with
         this TaskGroup's group_id. If set to False, child task_id and group_id are not prefixed.
         Default is True.
-    :type prefix_group_id: bool
+    :type prerfix_group_id: bool

Review comment:
       Fixed the Typo.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala edited a comment on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala edited a comment on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-762140061


   Hi @casassg @yuqian90 @ashb,
   
   I have raised new pull request with suggested changes. Kindly review it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r575607940



##########
File path: airflow/utils/task_group.py
##########
@@ -95,8 +109,23 @@ def __init__(
             self.used_group_ids = self._parent_group.used_group_ids
 
         self._group_id = group_id
-        if self.group_id in self.used_group_ids:
-            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        # if given group_id already used assign suffix by incrementing largest used suffix integer
+        # Example : task_group ==> task_group__1 -> task_group__2 -> task_group__3
+        if group_id in self.used_group_ids:
+            base = re.split(r'__\d+$', group_id)[0]
+            print([(i, type(i)) for i in self.used_group_ids])

Review comment:
       ya, 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] casassg commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
casassg commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r578061718



##########
File path: tests/utils/test_task_group.py
##########
@@ -576,3 +541,372 @@ def test_task_without_dag():
     assert op1.dag == op2.dag == op3.dag
     assert dag.task_group.children.keys() == {"op1", "op2", "op3"}
     assert dag.task_group.children.keys() == dag.task_dict.keys()
+
+
+# taskgroup decorator tests
+
+
+def test_build_task_group_deco_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be
+    created using TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    from airflow.operators.python import task
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t_start = task_start()
+        sec_1 = section_1(t_start)
+        sec_1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    # Node IDs test
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.task_4'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_build_task_group_with_operators():
+    """  Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator """
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup(group_id='section_1')
+    def section_a(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+        sec_1 = section_a(t_start.output)
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        sec_1.set_downstream(t_end)
+
+    # Testing Tasks ing DAG
+    assert set(dag.task_group.children.keys()) == {'section_1', 'task_start', 'task_end'}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_2',
+        'section_1.task_3',
+        'section_1.task_1',
+    }
+
+    # Testing Tasks downstream
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_3'].downstream_task_ids == {'task_end'}
+
+
+def test_task_group_context_mix():
+    """ Test cases to check nested TaskGroup context manager with taskgroup decorator"""
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_2(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+
+        with TaskGroup("section_1", tooltip="section_1") as section_1:
+            sec_2 = section_2(t_start.output)
+            task_s1 = DummyOperator(task_id="task_1")
+            task_s2 = BashOperator(task_id="task_2", bash_command='echo 1')
+            task_s3 = DummyOperator(task_id="task_3")
+
+            sec_2.set_downstream(task_s1)
+            task_s1 >> [task_s2, task_s3]
+
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        t_start >> section_1 >> t_end
+
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_1'},
+                            {'id': 'section_1.section_2.task_2'},
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.downstream_join_id'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.task_3'},
+                    {'id': 'section_1.upstream_join_id'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_duplicate_task_group_id():
+    """ Testing automatic suffix assignment for duplicate group_id"""
+
+    from airflow.operators.python import task
+
+    @task(task_id='start_task')
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        print('[Task_start]')
+
+    @task(task_id='end_task')
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[Task_End]')
+
+    # Creating Tasks
+    @task(task_id='task')
+    def task_1():
+        """ Dummy Task1"""
+        print('[Task1]')
+
+    @task(task_id='task')
+    def task_2():
+        """ Dummy Task2"""
+        print('[Task2]')
+
+    @task(task_id='task1')
+    def task_3():
+        """ Dummy Task3"""
+        print('[Task3]')
+
+    @taskgroup(group_id='task_group1')
+    def task_group1():
+        task_start()
+        task_1()
+        task_2()
+
+    @taskgroup(group_id='task_group1')
+    def task_group2():
+        task_3()
+
+    @taskgroup(group_id='task_group1')
+    def task_group3():
+        task_end()
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_duplicate_task_group_id", start_date=execution_date, tags=["example"]) as dag:
+        task_group1()
+        task_group2()
+        task_group3()

Review comment:
       Makes sense. I don't think it's a big deal either way. If users do this, even though it's weird and can be confusing, they can do it. 
   
   Mostly want to make sure we are valuing the right trade offs between complexity added and value of having this extra check.
   
   Not strongly opinionated though




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-762957417


   [The Workflow run](https://github.com/apache/airflow/actions/runs/496403502) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-797653241


   Hi @VBhojawala are you interested in continuing this PR? I'm glad to either finish it for you or help you finish :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r575665792



##########
File path: docs/apache-airflow/concepts.rst
##########
@@ -1121,6 +1121,26 @@ This animated gif shows the UI interactions. TaskGroups are expanded or collapse
 .. image:: img/task_group.gif
 
 
+TaskGroup can be created using ``@taskgroup decorator``, it takes one mandatory argument ``group_id`` which is same as constructor of TaskGroup class, if not given it copies function name as ``group_id``. It works exactly same as creating TaskGroup using context manager ``with TaskGroup('groupid') as section:``.

Review comment:
       ```suggestion
   TaskGroup can be created using ``@taskgroup`` decorator, it takes one mandatory argument ``group_id`` which is same as constructor of TaskGroup class, if not given it copies function name as ``group_id``. It works exactly same as creating TaskGroup using context manager ``with TaskGroup('groupid') as section:``.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-810952381


   Closing in favour of #15034 which continues this work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala removed a comment on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala removed a comment on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-797851925


   Hi @dimberman ,
   
   Kindly review below PR and help me to finish it : 
   https://github.com/apache/airflow/pull/13405
   
   related: #8970


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-777872122


   Can you rebase the PR on latest master please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-778618231


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb closed pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
ashb closed pull request #13479:
URL: https://github.com/apache/airflow/pull/13479


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r575665749



##########
File path: airflow/utils/task_group.py
##########
@@ -95,8 +109,22 @@ def __init__(
             self.used_group_ids = self._parent_group.used_group_ids
 
         self._group_id = group_id
-        if self.group_id in self.used_group_ids:
-            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        # if given group_id already used assign suffix by incrementing largest used suffix integer
+        # Example : task_group ==> task_group__1 -> task_group__2 -> task_group__3
+        if group_id in self.used_group_ids:
+            base = re.split(r'__\d+$', group_id)[0]
+            suffixes = sorted(
+                [
+                    int(re.split(r'^.+__', used_group_id)[1])
+                    for used_group_id in self.used_group_ids
+                    if used_group_id is not None and re.match(rf'^{base}__\d+$', used_group_id)
+                ]
+            )
+            if not suffixes:
+                self._group_id += '__1'
+            else:
+                self._group_id = f'{base}__{suffixes[-1] + 1}'
+

Review comment:
       This feature allow users to re use the same `TaskGroup` in the DAG, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] casassg commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
casassg commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r577797052



##########
File path: tests/utils/test_task_group.py
##########
@@ -576,3 +541,372 @@ def test_task_without_dag():
     assert op1.dag == op2.dag == op3.dag
     assert dag.task_group.children.keys() == {"op1", "op2", "op3"}
     assert dag.task_group.children.keys() == dag.task_dict.keys()
+
+
+# taskgroup decorator tests
+
+
+def test_build_task_group_deco_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be
+    created using TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    from airflow.operators.python import task
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t_start = task_start()
+        sec_1 = section_1(t_start)
+        sec_1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    # Node IDs test
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.task_4'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_build_task_group_with_operators():
+    """  Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator """
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup(group_id='section_1')
+    def section_a(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+        sec_1 = section_a(t_start.output)
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        sec_1.set_downstream(t_end)
+
+    # Testing Tasks ing DAG
+    assert set(dag.task_group.children.keys()) == {'section_1', 'task_start', 'task_end'}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_2',
+        'section_1.task_3',
+        'section_1.task_1',
+    }
+
+    # Testing Tasks downstream
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_3'].downstream_task_ids == {'task_end'}
+
+
+def test_task_group_context_mix():
+    """ Test cases to check nested TaskGroup context manager with taskgroup decorator"""
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_2(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+
+        with TaskGroup("section_1", tooltip="section_1") as section_1:
+            sec_2 = section_2(t_start.output)
+            task_s1 = DummyOperator(task_id="task_1")
+            task_s2 = BashOperator(task_id="task_2", bash_command='echo 1')
+            task_s3 = DummyOperator(task_id="task_3")
+
+            sec_2.set_downstream(task_s1)
+            task_s1 >> [task_s2, task_s3]
+
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        t_start >> section_1 >> t_end
+
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_1'},
+                            {'id': 'section_1.section_2.task_2'},
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.downstream_join_id'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.task_3'},
+                    {'id': 'section_1.upstream_join_id'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_duplicate_task_group_id():
+    """ Testing automatic suffix assignment for duplicate group_id"""
+
+    from airflow.operators.python import task
+
+    @task(task_id='start_task')
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        print('[Task_start]')
+
+    @task(task_id='end_task')
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[Task_End]')
+
+    # Creating Tasks
+    @task(task_id='task')
+    def task_1():
+        """ Dummy Task1"""
+        print('[Task1]')
+
+    @task(task_id='task')
+    def task_2():
+        """ Dummy Task2"""
+        print('[Task2]')
+
+    @task(task_id='task1')
+    def task_3():
+        """ Dummy Task3"""
+        print('[Task3]')
+
+    @taskgroup(group_id='task_group1')
+    def task_group1():
+        task_start()
+        task_1()
+        task_2()
+
+    @taskgroup(group_id='task_group1')
+    def task_group2():
+        task_3()
+
+    @taskgroup(group_id='task_group1')
+    def task_group3():
+        task_end()
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_duplicate_task_group_id", start_date=execution_date, tags=["example"]) as dag:
+        task_group1()
+        task_group2()
+        task_group3()

Review comment:
       I agree. But we can't retrieve the function for previously called decorated functions, which means that we can't know wether the function name is the same or not (aka wether its the same function being called 2 times or 2 functions w the same group_id). So while I agree this is an unintended side effect, I would leave it as is as it allows us to instatiate the same function several times




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r578454269



##########
File path: tests/utils/test_task_group.py
##########
@@ -576,3 +541,372 @@ def test_task_without_dag():
     assert op1.dag == op2.dag == op3.dag
     assert dag.task_group.children.keys() == {"op1", "op2", "op3"}
     assert dag.task_group.children.keys() == dag.task_dict.keys()
+
+
+# taskgroup decorator tests
+
+
+def test_build_task_group_deco_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be
+    created using TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    from airflow.operators.python import task
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t_start = task_start()
+        sec_1 = section_1(t_start)
+        sec_1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    # Node IDs test
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.task_4'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_build_task_group_with_operators():
+    """  Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator """
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup(group_id='section_1')
+    def section_a(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+        sec_1 = section_a(t_start.output)
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        sec_1.set_downstream(t_end)
+
+    # Testing Tasks ing DAG
+    assert set(dag.task_group.children.keys()) == {'section_1', 'task_start', 'task_end'}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_2',
+        'section_1.task_3',
+        'section_1.task_1',
+    }
+
+    # Testing Tasks downstream
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_3'].downstream_task_ids == {'task_end'}
+
+
+def test_task_group_context_mix():
+    """ Test cases to check nested TaskGroup context manager with taskgroup decorator"""
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_2(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+
+        with TaskGroup("section_1", tooltip="section_1") as section_1:
+            sec_2 = section_2(t_start.output)
+            task_s1 = DummyOperator(task_id="task_1")
+            task_s2 = BashOperator(task_id="task_2", bash_command='echo 1')
+            task_s3 = DummyOperator(task_id="task_3")
+
+            sec_2.set_downstream(task_s1)
+            task_s1 >> [task_s2, task_s3]
+
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        t_start >> section_1 >> t_end
+
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_1'},
+                            {'id': 'section_1.section_2.task_2'},
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.downstream_join_id'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.task_3'},
+                    {'id': 'section_1.upstream_join_id'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_duplicate_task_group_id():
+    """ Testing automatic suffix assignment for duplicate group_id"""
+
+    from airflow.operators.python import task
+
+    @task(task_id='start_task')
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        print('[Task_start]')
+
+    @task(task_id='end_task')
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[Task_End]')
+
+    # Creating Tasks
+    @task(task_id='task')
+    def task_1():
+        """ Dummy Task1"""
+        print('[Task1]')
+
+    @task(task_id='task')
+    def task_2():
+        """ Dummy Task2"""
+        print('[Task2]')
+
+    @task(task_id='task1')
+    def task_3():
+        """ Dummy Task3"""
+        print('[Task3]')
+
+    @taskgroup(group_id='task_group1')
+    def task_group1():
+        task_start()
+        task_1()
+        task_2()
+
+    @taskgroup(group_id='task_group1')
+    def task_group2():
+        task_3()
+
+    @taskgroup(group_id='task_group1')
+    def task_group3():
+        task_end()
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_duplicate_task_group_id", start_date=execution_date, tags=["example"]) as dag:
+        task_group1()
+        task_group2()
+        task_group3()

Review comment:
       My thought was that this is more likely a bug than desired behaviour -- for instance someone copy-and-pasted a task group but forgot to update the name




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r560010141



##########
File path: airflow/utils/task_group.py
##########
@@ -42,7 +56,7 @@ class TaskGroup(TaskMixin):
     :param prefix_group_id: If set to True, child task_id and group_id will be prefixed with
         this TaskGroup's group_id. If set to False, child task_id and group_id are not prefixed.
         Default is True.
-    :type prefix_group_id: bool
+    :type prerfix_group_id: bool

Review comment:
       Typo




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-778720234


   [The Workflow run](https://github.com/apache/airflow/actions/runs/565088281) is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r577929436



##########
File path: tests/utils/test_task_group.py
##########
@@ -576,3 +541,372 @@ def test_task_without_dag():
     assert op1.dag == op2.dag == op3.dag
     assert dag.task_group.children.keys() == {"op1", "op2", "op3"}
     assert dag.task_group.children.keys() == dag.task_dict.keys()
+
+
+# taskgroup decorator tests
+
+
+def test_build_task_group_deco_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be
+    created using TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    from airflow.operators.python import task
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t_start = task_start()
+        sec_1 = section_1(t_start)
+        sec_1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    # Node IDs test
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.task_4'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_build_task_group_with_operators():
+    """  Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator """
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup(group_id='section_1')
+    def section_a(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+        sec_1 = section_a(t_start.output)
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        sec_1.set_downstream(t_end)
+
+    # Testing Tasks ing DAG
+    assert set(dag.task_group.children.keys()) == {'section_1', 'task_start', 'task_end'}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_2',
+        'section_1.task_3',
+        'section_1.task_1',
+    }
+
+    # Testing Tasks downstream
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_3'].downstream_task_ids == {'task_end'}
+
+
+def test_task_group_context_mix():
+    """ Test cases to check nested TaskGroup context manager with taskgroup decorator"""
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_2(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+
+        with TaskGroup("section_1", tooltip="section_1") as section_1:
+            sec_2 = section_2(t_start.output)
+            task_s1 = DummyOperator(task_id="task_1")
+            task_s2 = BashOperator(task_id="task_2", bash_command='echo 1')
+            task_s3 = DummyOperator(task_id="task_3")
+
+            sec_2.set_downstream(task_s1)
+            task_s1 >> [task_s2, task_s3]
+
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        t_start >> section_1 >> t_end
+
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_1'},
+                            {'id': 'section_1.section_2.task_2'},
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.downstream_join_id'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.task_3'},
+                    {'id': 'section_1.upstream_join_id'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_duplicate_task_group_id():
+    """ Testing automatic suffix assignment for duplicate group_id"""
+
+    from airflow.operators.python import task
+
+    @task(task_id='start_task')
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        print('[Task_start]')
+
+    @task(task_id='end_task')
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[Task_End]')
+
+    # Creating Tasks
+    @task(task_id='task')
+    def task_1():
+        """ Dummy Task1"""
+        print('[Task1]')
+
+    @task(task_id='task')
+    def task_2():
+        """ Dummy Task2"""
+        print('[Task2]')
+
+    @task(task_id='task1')
+    def task_3():
+        """ Dummy Task3"""
+        print('[Task3]')
+
+    @taskgroup(group_id='task_group1')
+    def task_group1():
+        task_start()
+        task_1()
+        task_2()
+
+    @taskgroup(group_id='task_group1')
+    def task_group2():
+        task_3()
+
+    @taskgroup(group_id='task_group1')
+    def task_group3():
+        task_end()
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_duplicate_task_group_id", start_date=execution_date, tags=["example"]) as dag:
+        task_group1()
+        task_group2()
+        task_group3()

Review comment:
       We can't check what task group decorator created the previous function, but we could check if the _current_ TG was created by the current decorated function, and if not then it would fail.
   
   We could keep a set of TG objects in the wrapper fn, and if the TG already exists on the DAG, and the TG is not in that set, then it was created by some other decorator -> raise an error.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-777937544


   Hi @kaxil, I have rebased the  PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-797851925


   Hi @dimberman ,
   
   Kindly review below PR and help me to finish it : 
   https://github.com/apache/airflow/pull/13405
   
   related: #8970


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r577677520



##########
File path: airflow/utils/task_group.py
##########
@@ -19,11 +19,25 @@
 A TaskGroup is a collection of closely related tasks on the same DAG that should be grouped
 together when the DAG is displayed graphically.
 """
-
-from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Sequence, Set, Union
+import functools
+import re
+from typing import (
+    TYPE_CHECKING,
+    Callable,
+    Dict,
+    Generator,
+    List,
+    Optional,
+    Sequence,
+    Set,
+    TypeVar,
+    Union,
+    cast,
+)
 
 from airflow.exceptions import AirflowException, DuplicateTaskIdFound
 from airflow.models.taskmixin import TaskMixin
+from airflow.utils.decorators import signature

Review comment:
       ```suggestion
   from inspect import signature
   ```
   
   Please import directly form the module, not from somewhere else where we us it.

##########
File path: tests/utils/test_task_group.py
##########
@@ -576,3 +541,372 @@ def test_task_without_dag():
     assert op1.dag == op2.dag == op3.dag
     assert dag.task_group.children.keys() == {"op1", "op2", "op3"}
     assert dag.task_group.children.keys() == dag.task_dict.keys()
+
+
+# taskgroup decorator tests
+
+
+def test_build_task_group_deco_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be
+    created using TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    from airflow.operators.python import task
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t_start = task_start()
+        sec_1 = section_1(t_start)
+        sec_1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    # Node IDs test
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.task_4'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_build_task_group_with_operators():
+    """  Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator """
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup(group_id='section_1')
+    def section_a(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+        sec_1 = section_a(t_start.output)
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        sec_1.set_downstream(t_end)
+
+    # Testing Tasks ing DAG
+    assert set(dag.task_group.children.keys()) == {'section_1', 'task_start', 'task_end'}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_2',
+        'section_1.task_3',
+        'section_1.task_1',
+    }
+
+    # Testing Tasks downstream
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_3'].downstream_task_ids == {'task_end'}
+
+
+def test_task_group_context_mix():
+    """ Test cases to check nested TaskGroup context manager with taskgroup decorator"""
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_2(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+
+        with TaskGroup("section_1", tooltip="section_1") as section_1:
+            sec_2 = section_2(t_start.output)
+            task_s1 = DummyOperator(task_id="task_1")
+            task_s2 = BashOperator(task_id="task_2", bash_command='echo 1')
+            task_s3 = DummyOperator(task_id="task_3")
+
+            sec_2.set_downstream(task_s1)
+            task_s1 >> [task_s2, task_s3]
+
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        t_start >> section_1 >> t_end
+
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_1'},
+                            {'id': 'section_1.section_2.task_2'},
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.downstream_join_id'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.task_3'},
+                    {'id': 'section_1.upstream_join_id'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_duplicate_task_group_id():
+    """ Testing automatic suffix assignment for duplicate group_id"""
+
+    from airflow.operators.python import task
+
+    @task(task_id='start_task')
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        print('[Task_start]')
+
+    @task(task_id='end_task')
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[Task_End]')
+
+    # Creating Tasks
+    @task(task_id='task')
+    def task_1():
+        """ Dummy Task1"""
+        print('[Task1]')
+
+    @task(task_id='task')
+    def task_2():
+        """ Dummy Task2"""
+        print('[Task2]')
+
+    @task(task_id='task1')
+    def task_3():
+        """ Dummy Task3"""
+        print('[Task3]')
+
+    @taskgroup(group_id='task_group1')
+    def task_group1():
+        task_start()
+        task_1()
+        task_2()
+
+    @taskgroup(group_id='task_group1')
+    def task_group2():
+        task_3()
+
+    @taskgroup(group_id='task_group1')
+    def task_group3():
+        task_end()
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_duplicate_task_group_id", start_date=execution_date, tags=["example"]) as dag:
+        task_group1()
+        task_group2()
+        task_group3()

Review comment:
       I am not sure this is the right behavour!
   
   This is _separate_ task groups with the same id. That feels like a logic bug on the DAG author part and should throw an error.
   
   What do others think of this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-778631925


   [The Workflow run](https://github.com/apache/airflow/actions/runs/563905430) is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-762140061


   Hi @casassg ,
   
   I have raised new pull request with suggested changes. Kindly review it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-762957189


   [The Workflow run](https://github.com/apache/airflow/actions/runs/496403497) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r593344066



##########
File path: tests/utils/test_task_group.py
##########
@@ -576,3 +541,372 @@ def test_task_without_dag():
     assert op1.dag == op2.dag == op3.dag
     assert dag.task_group.children.keys() == {"op1", "op2", "op3"}
     assert dag.task_group.children.keys() == dag.task_dict.keys()
+
+
+# taskgroup decorator tests
+
+
+def test_build_task_group_deco_context_manager():
+    """
+    Tests Following :
+    1. Nested TaskGroup creation using taskgroup decorator should create same TaskGroup which can be
+    created using TaskGroup context manager.
+    2. TaskGroup consisting Tasks created using task decorator.
+    3. Node Ids of dags created with taskgroup decorator.
+    """
+
+    from airflow.operators.python import task
+
+    # Creating Tasks
+    @task
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    @task
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End ]')
+
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        print(f'[ Task2 {value} ]')
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        return f'[ Task3 {value} ]'
+
+    @task
+    def task_4(value):
+        """ Dummy Task3"""
+        print(f'[ Task4 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_1(value):
+        """ TaskGroup for grouping related Tasks"""
+
+        @taskgroup()
+        def section_2(value2):
+            """ TaskGroup for grouping related Tasks"""
+            return task_4(task_3(value2))
+
+        op1 = task_2(task_1(value))
+        return section_2(op1)
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(
+        dag_id="example_nested_task_group_decorator", start_date=execution_date, tags=["example"]
+    ) as dag:
+        t_start = task_start()
+        sec_1 = section_1(t_start)
+        sec_1.set_downstream(task_end())
+
+    # Testing TaskGroup created using taskgroup decorator
+    assert set(dag.task_group.children.keys()) == {"task_start", "task_end", "section_1"}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_1',
+        'section_1.task_2',
+        'section_1.section_2',
+    }
+
+    # Testing TaskGroup consisting Tasks created using task decorator
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_2'].downstream_task_ids == {'section_1.section_2.task_3'}
+    assert dag.task_dict['section_1.section_2.task_4'].downstream_task_ids == {'task_end'}
+
+    # Node IDs test
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.task_4'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_build_task_group_with_operators():
+    """  Tests DAG with Tasks created with *Operators and TaskGroup created with taskgroup decorator """
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup(group_id='section_1')
+    def section_a(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+        sec_1 = section_a(t_start.output)
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        sec_1.set_downstream(t_end)
+
+    # Testing Tasks ing DAG
+    assert set(dag.task_group.children.keys()) == {'section_1', 'task_start', 'task_end'}
+    assert set(dag.task_group.children['section_1'].children.keys()) == {
+        'section_1.task_2',
+        'section_1.task_3',
+        'section_1.task_1',
+    }
+
+    # Testing Tasks downstream
+    assert dag.task_dict['task_start'].downstream_task_ids == {'section_1.task_1'}
+    assert dag.task_dict['section_1.task_3'].downstream_task_ids == {'task_end'}
+
+
+def test_task_group_context_mix():
+    """ Test cases to check nested TaskGroup context manager with taskgroup decorator"""
+
+    from airflow.operators.python import task
+
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        return '[Task_start]'
+
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[ Task_End  ]')
+
+    # Creating Tasks
+    @task
+    def task_1(value):
+        """ Dummy Task1"""
+        return f'[ Task1 {value} ]'
+
+    @task
+    def task_2(value):
+        """ Dummy Task2"""
+        return f'[ Task2 {value} ]'
+
+    @task
+    def task_3(value):
+        """ Dummy Task3"""
+        print(f'[ Task3 {value} ]')
+
+    # Creating TaskGroups
+    @taskgroup
+    def section_2(value):
+        """ TaskGroup for grouping related Tasks"""
+        return task_3(task_2(task_1(value)))
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_task_group_decorator_mix", start_date=execution_date, tags=["example"]) as dag:
+        t_start = PythonOperator(task_id='task_start', python_callable=task_start, dag=dag)
+
+        with TaskGroup("section_1", tooltip="section_1") as section_1:
+            sec_2 = section_2(t_start.output)
+            task_s1 = DummyOperator(task_id="task_1")
+            task_s2 = BashOperator(task_id="task_2", bash_command='echo 1')
+            task_s3 = DummyOperator(task_id="task_3")
+
+            sec_2.set_downstream(task_s1)
+            task_s1 >> [task_s2, task_s3]
+
+        t_end = PythonOperator(task_id='task_end', python_callable=task_end, dag=dag)
+        t_start >> section_1 >> t_end
+
+    node_ids = {
+        'id': None,
+        'children': [
+            {
+                'id': 'section_1',
+                'children': [
+                    {
+                        'id': 'section_1.section_2',
+                        'children': [
+                            {'id': 'section_1.section_2.task_1'},
+                            {'id': 'section_1.section_2.task_2'},
+                            {'id': 'section_1.section_2.task_3'},
+                            {'id': 'section_1.section_2.downstream_join_id'},
+                        ],
+                    },
+                    {'id': 'section_1.task_1'},
+                    {'id': 'section_1.task_2'},
+                    {'id': 'section_1.task_3'},
+                    {'id': 'section_1.upstream_join_id'},
+                    {'id': 'section_1.downstream_join_id'},
+                ],
+            },
+            {'id': 'task_end'},
+            {'id': 'task_start'},
+        ],
+    }
+
+    assert extract_node_id(task_group_to_dict(dag.task_group)) == node_ids
+
+
+def test_duplicate_task_group_id():
+    """ Testing automatic suffix assignment for duplicate group_id"""
+
+    from airflow.operators.python import task
+
+    @task(task_id='start_task')
+    def task_start():
+        """Dummy Task which is First Task of Dag """
+        print('[Task_start]')
+
+    @task(task_id='end_task')
+    def task_end():
+        """Dummy Task which is Last Task of Dag"""
+        print('[Task_End]')
+
+    # Creating Tasks
+    @task(task_id='task')
+    def task_1():
+        """ Dummy Task1"""
+        print('[Task1]')
+
+    @task(task_id='task')
+    def task_2():
+        """ Dummy Task2"""
+        print('[Task2]')
+
+    @task(task_id='task1')
+    def task_3():
+        """ Dummy Task3"""
+        print('[Task3]')
+
+    @taskgroup(group_id='task_group1')
+    def task_group1():
+        task_start()
+        task_1()
+        task_2()
+
+    @taskgroup(group_id='task_group1')
+    def task_group2():
+        task_3()
+
+    @taskgroup(group_id='task_group1')
+    def task_group3():
+        task_end()
+
+    execution_date = pendulum.parse("20201109")
+    with DAG(dag_id="example_duplicate_task_group_id", start_date=execution_date, tags=["example"]) as dag:
+        task_group1()
+        task_group2()
+        task_group3()

Review comment:
       I agree with @casassg here. It would be very hard to enforce this. I'm especially imagining situations where users import taskgroups from libraries and therefore do not have control over the task_group_id. That said, it would make sense for us to allow users to override the task_group_id via a kwarg in case they import two functions with the same id name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] VBhojawala commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
VBhojawala commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r575674196



##########
File path: airflow/utils/task_group.py
##########
@@ -95,8 +109,22 @@ def __init__(
             self.used_group_ids = self._parent_group.used_group_ids
 
         self._group_id = group_id
-        if self.group_id in self.used_group_ids:
-            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        # if given group_id already used assign suffix by incrementing largest used suffix integer
+        # Example : task_group ==> task_group__1 -> task_group__2 -> task_group__3
+        if group_id in self.used_group_ids:
+            base = re.split(r'__\d+$', group_id)[0]
+            suffixes = sorted(
+                [
+                    int(re.split(r'^.+__', used_group_id)[1])
+                    for used_group_id in self.used_group_ids
+                    if used_group_id is not None and re.match(rf'^{base}__\d+$', used_group_id)
+                ]
+            )
+            if not suffixes:
+                self._group_id += '__1'
+            else:
+                self._group_id = f'{base}__{suffixes[-1] + 1}'
+

Review comment:
       Correct




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#issuecomment-799306754


   > Hi @dimberman ,
   > 
   > Kindly review below PR and help me to finish it :
   > #13405
   > 
   > related: #8970
   
   @VBhojawala commenting on another unrelated PR asking for a review is considered poor etiquette - please don't do this. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r575325492



##########
File path: airflow/utils/task_group.py
##########
@@ -95,8 +109,23 @@ def __init__(
             self.used_group_ids = self._parent_group.used_group_ids
 
         self._group_id = group_id
-        if self.group_id in self.used_group_ids:
-            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        # if given group_id already used assign suffix by incrementing largest used suffix integer
+        # Example : task_group ==> task_group__1 -> task_group__2 -> task_group__3
+        if group_id in self.used_group_ids:
+            base = re.split(r'__\d+$', group_id)[0]
+            print([(i, type(i)) for i in self.used_group_ids])

Review comment:
       We don't want a print statement here I think -- (maybe a remnant od debugging)
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r575325492



##########
File path: airflow/utils/task_group.py
##########
@@ -95,8 +109,23 @@ def __init__(
             self.used_group_ids = self._parent_group.used_group_ids
 
         self._group_id = group_id
-        if self.group_id in self.used_group_ids:
-            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        # if given group_id already used assign suffix by incrementing largest used suffix integer
+        # Example : task_group ==> task_group__1 -> task_group__2 -> task_group__3
+        if group_id in self.used_group_ids:
+            base = re.split(r'__\d+$', group_id)[0]
+            print([(i, type(i)) for i in self.used_group_ids])

Review comment:
       We don't want a print statement here I think -- (maybe a remnant of debugging)
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #13479: Added taskgroup decorator

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #13479:
URL: https://github.com/apache/airflow/pull/13479#discussion_r592709957



##########
File path: airflow/example_dags/example_task_group_decorator.py
##########
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the @taskgroup decorator."""
+
+from airflow.decorators import task, taskgroup
+from airflow.models.dag import DAG
+from airflow.utils.dates import days_ago
+
+
+# [START howto_task_group_decorator]
+# Creating Tasks
+@task
+def task_start():
+    """Dummy Task which is First Task of Dag """
+    return '[Task_start]'
+
+
+@task
+def task_1(value):
+    """ Dummy Task1"""
+    return f'[ Task1 {value} ]'
+
+
+@task
+def task_2(value):
+    """ Dummy Task2"""
+    return f'[ Task2 {value} ]'
+
+
+@task
+def task_3(value):
+    """ Dummy Task3"""
+    print(f'[ Task3 {value} ]')
+
+
+@task
+def task_end():
+    """ Dummy Task which is Last Task of Dag """
+    print('[ Task_End  ]')
+
+
+# Creating TaskGroups
+@taskgroup

Review comment:
       might be nicer to call it "@task_group" instead of "@taskgroup" since there's no capitalization

##########
File path: airflow/example_dags/example_task_group_decorator.py
##########
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the @taskgroup decorator."""
+
+from airflow.decorators import task, taskgroup
+from airflow.models.dag import DAG
+from airflow.utils.dates import days_ago
+
+
+# [START howto_task_group_decorator]
+# Creating Tasks
+@task
+def task_start():
+    """Dummy Task which is First Task of Dag """
+    return '[Task_start]'
+
+
+@task
+def task_1(value):
+    """ Dummy Task1"""
+    return f'[ Task1 {value} ]'
+
+
+@task
+def task_2(value):
+    """ Dummy Task2"""
+    return f'[ Task2 {value} ]'
+
+
+@task
+def task_3(value):
+    """ Dummy Task3"""
+    print(f'[ Task3 {value} ]')
+
+
+@task
+def task_end():
+    """ Dummy Task which is Last Task of Dag """
+    print('[ Task_End  ]')
+
+
+# Creating TaskGroups
+@taskgroup
+def section_1(value):
+    """ TaskGroup for grouping related Tasks"""
+    return task_3(task_2(task_1(value)))

Review comment:
       Can you unwind this? I wound't want to encourage people to nest like this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org