You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/04 09:59:47 UTC

[GitHub] [airflow] yuqian90 opened a new pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

yuqian90 opened a new pull request #10153:
URL: https://github.com/apache/airflow/pull/10153


   This is still work-in-progress. i really liked @ashb and a few other people's idea of having TaskGroup entirely as a "UI grouping concept". It's much simpler than the existing `SubDagOperator` approach. With some syntax sugar such as `__rshift__` and `__lshift__`, creating repeated/nested sections in a DAG can be made much easier.
   
   @xinbinhuang already attempted this in https://github.com/apache/airflow/pull/9243. I'm trying to simplify the idea further.
   In this PR, `TaskGroup` is just a utility class used to constructions sub-sections in a DAG. It does not need to be persisted to db at all. The only thing the UI needs to know is the `task_group.group_id` of a task. Once we know the group_id, I'm sure we can do something to the Web UI to put tasks in the same group_id close together as a single node (The UI part has not been done in this PR).
   
   This is the example_task_group that this PR adds:
   When zoomed out, the DAG graph should look like this:
   ![image](https://user-images.githubusercontent.com/6637585/89280726-9d80b200-d67b-11ea-8d15-632635ce5dea.png)
   
   When zoomed in, the DAG graph should look like this:
   ![image](https://user-images.githubusercontent.com/6637585/89280731-a2456600-d67b-11ea-9d6c-056bd08e8438.png)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r477713551



##########
File path: airflow/models/baseoperator.py
##########
@@ -382,7 +389,16 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self.label = task_id
+
+        # Prefix task_id with group_id
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        if task_group:
+            self.task_id = f"{task_group.group_id}.{self.label}" if task_group.group_id else self.label

Review comment:
       I see where you are coming from now. I get that the semantic of task_id will remain the same if task group is not being used. I am trying to see if we can maintain that for task grouped tasks as well so it will be less surprise for people when they adopt it.
   
   I agree with you that asking users to manually label tasks is not a good experience and we should not go that route.
   
   It looks like the core of the problem is task names could conflict between groups. Is that an option to simply not allow users define tasks with conflicting names across groups? Or do we have to support task_id namespacing by group due to how it's being handled in subdag operator? I feel like this will simplify the design a lot and make it truly a UI-only feature.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r469280080



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+TaskGroup
+"""
+
+from typing import List, Optional
+
+
+class TaskGroup:
+    """
+    A collection of tasks. Tasks within the TaskGroup have their task_id prefixed with the
+    group_id of the TaskGroup. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+    """
+    def __init__(self, group_id, parent_group=None):
+        if group_id is None:
+            # This creates a root TaskGroup.
+            self.parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+            self.parent_group = parent_group or TaskGroupContext.get_current_task_group()
+
+        self._group_id = group_id
+        if self.parent_group:
+            self.parent_group.add(self)
+        self.children = {}
+
+    @classmethod
+    def create_root(cls):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None)
+
+    @property
+    def is_root(self):
+        """
+        Returns True if this TaskGroup is the root TaskGroup. Otherwise False
+        """
+        return not self.parent_group
+
+    def __iter__(self):
+        for child in self.children.values():
+            if isinstance(child, TaskGroup):
+                for inner_task in child:
+                    yield inner_task
+            else:
+                yield child
+
+    def add(self, task):
+        """
+        Add a task to this TaskGroup.
+        """
+        if task.label in self.children:
+            raise ValueError(f"Duplicate label {task.label} in {self.group_id}")
+        self.children[task.label] = task
+
+    @property
+    def label(self):
+        """
+        group_id excluding parent's group_id.
+        """
+        return self._group_id
+
+    @property
+    def group_id(self):
+        """
+        group_id is prefixed with parent group_id if applicable.
+        """
+        if not self.group_ids:
+            return None
+
+        return ".".join(self.group_ids)
+
+    @property
+    def group_ids(self):
+        """
+        Returns all the group_id of nested TaskGroups as a list, starting from the top.
+        """
+        return list(self._group_ids())[1:]
+
+    def _group_ids(self):
+        if self.parent_group:
+            for group_id in self.parent_group._group_ids():  # pylint: disable=protected-access
+                yield group_id
+
+        yield self._group_id
+
+    def set_downstream(self, task_or_task_list) -> None:

Review comment:
       Sure. will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r471805675



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -626,3 +653,60 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
         if ver != cls.SERIALIZER_VERSION:
             raise ValueError("Unsure how to deserialize version {!r}".format(ver))
         return cls.deserialize_dag(serialized_obj['dag'])
+
+
+class SerializedTaskGroup(TaskGroup, BaseSerialization):
+    """
+    A JSON serializable representation of TaskGroup.
+    """
+    @classmethod
+    def serialize_task_group(cls, task_group: TaskGroup) -> Union[Dict[str, Any], None]:
+        """Serializes TaskGroup into a JSON object.
+        """
+        if not task_group:
+            return None
+
+        serialize_group = {}
+        serialize_group["_group_id"] = task_group._group_id  # pylint: disable=protected-access
+
+        serialize_group['children'] = {  # type: ignore
+            label: (DAT.OP, child.task_id)
+            if isinstance(child, BaseOperator) else
+            (DAT.TASK_GROUP, SerializedTaskGroup.serialize_task_group(child))
+            for label, child in task_group.children.items()
+        }
+        serialize_group['tooltip'] = task_group.tooltip
+        serialize_group['ui_color'] = task_group.ui_color
+        serialize_group['ui_fgcolor'] = task_group.ui_fgcolor
+        return serialize_group
+
+    @classmethod
+    def deserialize_task_group(
+        cls,
+        encoded_group: Dict[str, Any],
+        parent_group: Union[TaskGroup, None],
+        task_dict: Dict[str, BaseOperator]
+    ) -> Union[TaskGroup, None]:

Review comment:
       For nullable types, it's better to use `Optional`
   
   ```suggestion
       ) -> Optional[TaskGroup]:
   ```

##########
File path: airflow/models/baseoperator.py
##########
@@ -359,9 +363,12 @@ def __init__(
         do_xcom_push: bool = True,
         inlets: Optional[Any] = None,
         outlets: Optional[Any] = None,
+        task_group: Optional["TaskGroup"] = None,
         **kwargs
     ):
         from airflow.models.dag import DagContext
+        from airflow.utils.task_group import TaskGroupContext

Review comment:
       is this for avoiding circular import? I thought having the `if TYPE_CHECKING` in utils.task_group should be enough?

##########
File path: airflow/models/baseoperator.py
##########
@@ -380,7 +387,15 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self._task_id = task_id

Review comment:
       I am a bit concerned with us changing the semantic of `self.task_id` here for a UI feature. There are airflow code written with the assumption this is the id that can be used for querying in database.
   
   If the task_group prefixed "task_id" is only intended for visualization, would it make more sense to create a new attribute for it instead of overriding the existing one?

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -626,3 +653,60 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
         if ver != cls.SERIALIZER_VERSION:
             raise ValueError("Unsure how to deserialize version {!r}".format(ver))
         return cls.deserialize_dag(serialized_obj['dag'])
+
+
+class SerializedTaskGroup(TaskGroup, BaseSerialization):
+    """
+    A JSON serializable representation of TaskGroup.
+    """
+    @classmethod
+    def serialize_task_group(cls, task_group: TaskGroup) -> Union[Dict[str, Any], None]:
+        """Serializes TaskGroup into a JSON object.
+        """
+        if not task_group:
+            return None
+
+        serialize_group = {}
+        serialize_group["_group_id"] = task_group._group_id  # pylint: disable=protected-access
+
+        serialize_group['children'] = {  # type: ignore
+            label: (DAT.OP, child.task_id)
+            if isinstance(child, BaseOperator) else
+            (DAT.TASK_GROUP, SerializedTaskGroup.serialize_task_group(child))
+            for label, child in task_group.children.items()
+        }
+        serialize_group['tooltip'] = task_group.tooltip
+        serialize_group['ui_color'] = task_group.ui_color
+        serialize_group['ui_fgcolor'] = task_group.ui_fgcolor

Review comment:
       nitpick, looks like these individual dictionary set operation can be combined into the dict initialization statement?
   
   ```python
   serialize_group = {
       "tooltip": task_group.toolip,
       ...
   }
   ```

##########
File path: airflow/www/views.py
##########
@@ -147,6 +147,41 @@ def get_date_time_num_runs_dag_runs_form_data(request, session, dag):
     }
 
 
+def task_group_to_dict(task_group):
+    """
+    Create a nested dict representation of this TaskGroup and its children used for
+    rendering web UI.
+    """
+    from airflow.models.baseoperator import BaseOperator
+
+    if isinstance(task_group, BaseOperator):
+        return {
+            'id': task_group.task_id,
+            'value': {
+                'label': task_group.label,
+                'labelStyle': "fill:{0};".format(task_group.ui_fgcolor),
+                'style': "fill:{0};".format(task_group.ui_color),
+                'rx': 5,
+                'ry': 5,
+            }
+        }
+
+    return {
+        "id": task_group.group_id,
+        'value': {
+            'label': task_group.label,
+            'labelStyle': "fill:{0};".format(task_group.ui_fgcolor),

Review comment:
       nitpick, would be good to consistently use `f-string` everywhere instead of format, I noticed you are already using f-string for style key a line below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-688954105


   > > I love this change! This DAG representation is nice and clean. I'm just wondering if this will work with AIP-31 approach (`@task` decorator). Especially
   > > ```python
   > > @task
   > > def my_task():
   > >     ...
   > > 
   > > with TaskGroup(...) as tg:
   > >     ...
   > > 
   > > r = my_task()
   > > r >> tg
   > > ```
   > 
   > Hi, @turbaszek thanks for the detailed review. I have updated the PR accordingly.
   > 
   > Regarding the question about AIP-31, I just verified it works like a charm. The indentation of your call to `my_task()` needs to be updated to be inside the TaskGroup contextmanager.
   
   The lack of indentation was on purpose to check if `TaskGroup` works well with `>>` between `XComArg` which is returned by invoking `my_task()`: 
   https://github.com/apache/airflow/blob/aaf56f9816ed72e18a3215183c185d379b4e4247/airflow/operators/python.py#L291
   
   At least typing here is missing `TaskGroup`:
   https://github.com/apache/airflow/blob/aaf56f9816ed72e18a3215183c185d379b4e4247/airflow/models/xcom_arg.py#L118-L130
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-695219505


   Is anyone interested in using TaskGroup in the next 1.10.* release? We can contribute our own cherry-picked commit too for v1-10-test since we are going to use it anyway ourselves.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-689468941


   > The lack of indentation was on purpose to check if `TaskGroup` works well with `>>` between `XComArg` which is returned by invoking `my_task()`:
   > https://github.com/apache/airflow/blob/aaf56f9816ed72e18a3215183c185d379b4e4247/airflow/operators/python.py#L291
   > 
   > At least typing here is missing `TaskGroup`:
   > https://github.com/apache/airflow/blob/aaf56f9816ed72e18a3215183c185d379b4e4247/airflow/models/xcom_arg.py#L118-L130
   
   
   Hi @turbaszek, thanks for pointing out. In order to demonstrate `TaskGroup` working with `@task` and making sure it continues to work, I added `test_build_task_group_with_task_decorator` in `test_task_group.py`. Please see if that's in line with what you have in mind.
   
   That said, I noticed two small issues that's not related to `TaskGroup`, but rather to `XComArg`:
   
   1. This function inside `xcom_args.py` returns `self` instead of `other` (which is what `BaseOperator` does).
   
   ```
       def __rshift__(self, other):
           """
           Implements XComArg >> op
           """
           self.set_downstream(other)
           return self
   ```
   
   2. `BaseOperator` `__lshift__` and `__rshift__` do not handle `XComArg`.
   
   As a result, it effectively prevents us from chaining operators like this:
   ```
   tsk_1 >> group234 >> tsk_5
   ```
   
   I.e. we would intuitively expect the above line to put `tsk_5` downstream of `group234`. But because `tsk_1` is a `XComArg`, what this line actually does is very surprising. It puts both `group234` and `tsk_5` downstream of `tsk_1`. I.e. it's equivalent to doing this, which is what I added in the test.
   ```
   tsk_1 >> group234
   tsk_5 << group234
   ```
   
   I'm not sure if this is a bug or a `XComArg` feature. Since it's not a `TaskGroup` issue, the change (if any) should probably be done separately in a different PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r485525588



##########
File path: tests/utils/test_task_group.py
##########
@@ -0,0 +1,523 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+import pytest
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.task_group import TaskGroup
+from airflow.www.views import dag_edges, task_group_to_dict
+
+EXPECTED_JSON = {
+    'id': None,
+    'value': {
+        'label': None,
+        'labelStyle': 'fill:#000;',
+        'style': 'fill:CornflowerBlue',
+        'rx': 5,
+        'ry': 5,
+        'clusterLabelPos': 'top',
+    },
+    'tooltip': '',
+    'children': [
+        {
+            'id': 'group234',
+            'value': {
+                'label': 'group234',
+                'labelStyle': 'fill:#000;',
+                'style': 'fill:CornflowerBlue',
+                'rx': 5,
+                'ry': 5,
+                'clusterLabelPos': 'top',
+            },
+            'tooltip': '',
+            'children': [
+                {
+                    'id': 'group234.group34',
+                    'value': {
+                        'label': 'group34',
+                        'labelStyle': 'fill:#000;',
+                        'style': 'fill:CornflowerBlue',
+                        'rx': 5,
+                        'ry': 5,
+                        'clusterLabelPos': 'top',
+                    },
+                    'tooltip': '',
+                    'children': [
+                        {
+                            'id': 'group234.group34.task3',
+                            'value': {
+                                'label': 'task3',
+                                'labelStyle': 'fill:#000;',
+                                'style': 'fill:#e8f7e4;',
+                                'rx': 5,
+                                'ry': 5,
+                            },
+                        },
+                        {
+                            'id': 'group234.group34.task4',
+                            'value': {
+                                'label': 'task4',
+                                'labelStyle': 'fill:#000;',
+                                'style': 'fill:#e8f7e4;',
+                                'rx': 5,
+                                'ry': 5,
+                            },
+                        },
+                        {
+                            'id': 'group234.group34.downstream_join_id',
+                            'value': {
+                                'label': '',
+                                'labelStyle': 'fill:#000;',
+                                'style': 'fill:CornflowerBlue;',
+                                'shape': 'circle',
+                            },
+                        },
+                    ],
+                },
+                {
+                    'id': 'group234.task2',
+                    'value': {
+                        'label': 'task2',
+                        'labelStyle': 'fill:#000;',
+                        'style': 'fill:#e8f7e4;',
+                        'rx': 5,
+                        'ry': 5,
+                    },
+                },
+                {
+                    'id': 'group234.upstream_join_id',
+                    'value': {
+                        'label': '',
+                        'labelStyle': 'fill:#000;',
+                        'style': 'fill:CornflowerBlue;',
+                        'shape': 'circle',
+                    },
+                },
+            ],
+        },
+        {
+            'id': 'task1',
+            'value': {
+                'label': 'task1',
+                'labelStyle': 'fill:#000;',
+                'style': 'fill:#e8f7e4;',
+                'rx': 5,
+                'ry': 5,
+            },
+        },
+        {
+            'id': 'task5',
+            'value': {
+                'label': 'task5',
+                'labelStyle': 'fill:#000;',
+                'style': 'fill:#e8f7e4;',
+                'rx': 5,
+                'ry': 5,
+            },
+        },
+    ],
+}
+
+
+def test_build_task_group_context_manager():
+    execution_date = pendulum.parse("20200101")
+    with DAG("test_build_task_group_context_manager", start_date=execution_date) as dag:
+        task1 = DummyOperator(task_id="task1")
+        with TaskGroup("group234") as group234:
+            _ = DummyOperator(task_id="task2")
+
+            with TaskGroup("group34") as group34:
+                _ = DummyOperator(task_id="task3")
+                _ = DummyOperator(task_id="task4")
+
+        task5 = DummyOperator(task_id="task5")
+        task1 >> group234
+        group34 >> task5
+
+    assert task1.get_direct_relative_ids(upstream=False) == {
+        'group234.group34.task4',
+        'group234.group34.task3',
+        'group234.task2',
+    }
+    assert task5.get_direct_relative_ids(upstream=True) == {
+        'group234.group34.task4',
+        'group234.group34.task3',
+    }
+
+    assert dag.task_group.group_id is None
+    assert dag.task_group.is_root
+    assert set(dag.task_group.children.keys()) == {"task1", "group234", "task5"}
+    assert group34.group_id == "group234.group34"
+
+    assert task_group_to_dict(dag.task_group) == EXPECTED_JSON
+
+
+def test_build_task_group():
+    """
+    This is an alternative syntax to use TaskGroup. It should result in the same TaskGroup
+    as using context manager.
+    """
+    execution_date = pendulum.parse("20200101")
+    dag = DAG("test_build_task_group", start_date=execution_date)
+    task1 = DummyOperator(task_id="task1", dag=dag)
+    group234 = TaskGroup("group234", dag=dag)
+    _ = DummyOperator(task_id="task2", dag=dag, task_group=group234)
+    group34 = TaskGroup("group34", dag=dag, parent_group=group234)
+    _ = DummyOperator(task_id="task3", dag=dag, task_group=group34)
+    _ = DummyOperator(task_id="task4", dag=dag, task_group=group34)
+    task5 = DummyOperator(task_id="task5", dag=dag)
+
+    task1 >> group234
+    group34 >> task5
+
+    assert task_group_to_dict(dag.task_group) == EXPECTED_JSON
+
+
+def extract_node_id(node, include_label=False):
+    ret = {"id": node["id"]}
+    if include_label:
+        ret["label"] = node["value"]["label"]
+    if "children" in node:
+        children = []
+        for child in node["children"]:
+            children.append(extract_node_id(child, include_label=include_label))
+
+        ret["children"] = children
+
+    return ret
+
+
+def test_build_task_group_with_prefix():
+    """
+    Tests that prefix_group_id turns on/off prefixing of task_id with group_id.
+    """
+    execution_date = pendulum.parse("20200101")
+    with DAG("test_build_task_group_with_prefix", start_date=execution_date) as dag:
+        task1 = DummyOperator(task_id="task1")
+        with TaskGroup("group234", prefix_group_id=False) as group234:
+            task2 = DummyOperator(task_id="task2")
+
+            with TaskGroup("group34") as group34:
+                task3 = DummyOperator(task_id="task3")
+
+                with TaskGroup("group4", prefix_group_id=False) as group4:
+                    task4 = DummyOperator(task_id="task4")
+
+        task5 = DummyOperator(task_id="task5")
+        task1 >> group234 >> task5
+
+    assert task2.task_id == "task2"
+    assert group34.group_id == "group34"
+    assert task3.task_id == "group34.task3"
+    assert group4.group_id == "group34.group4"
+    assert task4.task_id == "task4"
+    assert task5.task_id == "task5"
+    assert group234.get_child_by_label("task2") == task2
+    assert group234.get_child_by_label("group34") == group34
+    assert group4.get_child_by_label("task4") == task4
+
+    assert extract_node_id(task_group_to_dict(dag.task_group), include_label=True) == {
+        'id': None,
+        'label': None,
+        'children': [
+            {
+                'id': 'group234',
+                'label': 'group234',
+                'children': [
+                    {
+                        'id': 'group34',
+                        'label': 'group34',
+                        'children': [
+                            {
+                                'id': 'group34.group4',
+                                'label': 'group4',
+                                'children': [{'id': 'task4', 'label': 'task4'}],
+                            },
+                            {'id': 'group34.task3', 'label': 'task3'},
+                            {'id': 'group34.downstream_join_id', 'label': ''},
+                        ],
+                    },
+                    {'id': 'task2', 'label': 'task2'},
+                    {'id': 'group234.upstream_join_id', 'label': ''},
+                ],
+            },
+            {'id': 'task1', 'label': 'task1'},
+            {'id': 'task5', 'label': 'task5'},
+        ],
+    }
+
+
+def test_build_task_group_with_task_decorator():
+    """
+    Test that TaskGroup can be used with the @task decorator.
+    """
+    from airflow.operators.python import task
+
+    @task
+    def task_1():
+        print("task_1")
+
+    @task
+    def task_2():
+        return "task_2"
+
+    @task
+    def task_3():
+        return "task_3"
+
+    @task
+    def task_4(task_2_output, task_3_output):
+        print(task_2_output, task_3_output)
+
+    @task
+    def task_5():
+        print("task_5")
+
+    execution_date = pendulum.parse("20200101")
+    with DAG("test_build_task_group_with_task_decorator", start_date=execution_date) as dag:
+        tsk_1 = task_1()
+
+        with TaskGroup("group234") as group234:
+            tsk_2 = task_2()
+            tsk_3 = task_3()
+            _ = task_4(tsk_2, tsk_3)
+
+        tsk_5 = task_5()
+
+        tsk_1 >> group234
+        tsk_5 << group234

Review comment:
       Can we add something like this here to make sure that it's truly chained? 
   ```py
   assert tas_1.operator in tsk_2.operator.upstream_list
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691026924






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691054773






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484799116



##########
File path: airflow/models/baseoperator.py
##########
@@ -57,6 +58,9 @@
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.weight_rule import WeightRule
 
+if TYPE_CHECKING:
+    from airflow.utils.task_group import TaskGroup  # pylint: disable=cyclic-import

Review comment:
       There's actually no cyclic dependency if not because of the type checking at the module scope. E.g. in order to add this type hints in baseoperator.py, we need to import `TaskGroup` under `TYPE_CHECKING`
   ```
   task_group: Optional["TaskGroup"] = None,
   ```
   
   In task_group.py it's the same. In order to put `BaseOperator` in the type hints, `BaseOperator` needs to be imported.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484264291



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,392 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+A TaskGroup is a collection of closely related tasks on the same DAG that should be grouped
+together when the DAG is displayed graphically.
+"""
+
+from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException, DuplicateTaskIdFound
+
+if TYPE_CHECKING:
+    from airflow.models.baseoperator import BaseOperator
+    from airflow.models.dag import DAG
+
+
+class TaskGroup:
+    """
+    A collection of tasks. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+
+    :param group_id: a unique, meaningful id for the TaskGroup. group_id must not conflict
+        with group_id of TaskGroup or task_id of tasks in the DAG. Root TaskGroup has group_id
+        set to None.
+    :type group_id: str
+    :param prefix_group_id: If set to True, child task_id and group_id will be prefixed with
+        this TaskGroup's group_id. If set to False, child task_id and group_id are not prefixed.
+        Default is True.
+    :type prerfix_group_id: bool
+    :param parent_group: The parent TaskGroup of this TaskGroup. parent_group is set to None
+        for the root TaskGroup.
+    :type parent_group: TaskGroup
+    :param dag: The DAG that this TaskGroup belongs to.
+    :type dag: airflow.models.DAG
+    :param tooltip: The tooltip of the TaskGroup node when displayed in the UI
+    :type tooltip: str
+    :param ui_color: The fill color of the TaskGroup node when displayed in the UI
+    :type ui_color: str
+    :param ui_fgcolor: The label color of the TaskGroup node when displayed in the UI
+    :type ui_fgcolor: str
+    """
+
+    def __init__(
+        self,
+        group_id: Optional[str],
+        prefix_group_id: bool = True,
+        parent_group: Optional["TaskGroup"] = None,
+        dag: Optional["DAG"] = None,
+        tooltip: str = "",
+        ui_color: str = "CornflowerBlue",
+        ui_fgcolor: str = "#000",
+    ):
+        from airflow.models.dag import DagContext
+
+        self.prefix_group_id = prefix_group_id
+
+        if group_id is None:
+            # This creates a root TaskGroup.
+            if parent_group:
+                raise AirflowException("Root TaskGroup cannot have parent_group")
+            # used_group_ids is shared across all TaskGroups in the same DAG to keep track
+            # of used group_id to avoid duplication.
+            self.used_group_ids: Set[Optional[str]] = set()
+            self._parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+
+            dag = dag or DagContext.get_current_dag()
+
+            if not parent_group and not dag:
+                raise AirflowException("TaskGroup can only be used inside a dag")
+
+            self._parent_group = parent_group or TaskGroupContext.get_current_task_group(dag)
+            if not self._parent_group:
+                raise AirflowException("TaskGroup must have a parent_group except for the root TaskGroup")
+            self.used_group_ids = self._parent_group.used_group_ids
+
+        self._group_id = group_id
+        if self.group_id in self.used_group_ids:
+            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        self.used_group_ids.add(self.group_id)
+        self.used_group_ids.add(self.downstream_join_id)
+        self.used_group_ids.add(self.upstream_join_id)
+        self.children: Dict[str, Union["BaseOperator", "TaskGroup"]] = {}
+        if self._parent_group:
+            self._parent_group.add(self)
+
+        self.tooltip = tooltip
+        self.ui_color = ui_color
+        self.ui_fgcolor = ui_fgcolor
+
+        # Keep track of TaskGroups or tasks that depend on this entire TaskGroup separately
+        # so that we can optimize the number of edges when entire TaskGroups depend on each other.
+        self.upstream_group_ids: Set[Optional[str]] = set()
+        self.downstream_group_ids: Set[Optional[str]] = set()
+        self.upstream_task_ids: Set[Optional[str]] = set()
+        self.downstream_task_ids: Set[Optional[str]] = set()
+
+    @classmethod
+    def create_root(cls, dag: "DAG"):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None, dag=dag)
+
+    @property
+    def is_root(self):
+        """
+        Returns True if this TaskGroup is the root TaskGroup. Otherwise False
+        """
+        return not self.group_id
+
+    def __iter__(self):
+        for child in self.children.values():
+            if isinstance(child, TaskGroup):
+                for inner_task in child:
+                    yield inner_task
+            else:
+                yield child
+
+    def add(self, task: Union["BaseOperator", "TaskGroup"]) -> None:
+        """
+        Add a task to this TaskGroup.
+        """
+        key = task.group_id if isinstance(task, TaskGroup) else task.task_id
+
+        if key in self.children:
+            raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
+
+        if isinstance(task, TaskGroup):
+            if task.children:
+                raise AirflowException("Cannot add a non-empty TaskGroup")
+
+        self.children[key] = task  # type: ignore
+
+    @property
+    def group_id(self) -> Optional[str]:
+        """
+        group_id of this TaskGroup.
+        """
+        if self._parent_group and self._parent_group.prefix_group_id and self._parent_group.group_id:
+            return self._parent_group.child_id(self._group_id)
+
+        return self._group_id
+
+    @property
+    def label(self):
+        """
+        group_id excluding parent's group_id used as the node label in UI.
+        """
+        return self._group_id
+
+    def _set_relative(
+            self,
+            task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"],
+            upstream: bool = False
+    ) -> None:
+        """
+        Call set_upstream/set_downstream for all root/leaf tasks within this TaskGroup.
+        Update upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids.
+        """
+        from airflow.models.baseoperator import BaseOperator
+
+        if upstream:
+            for task in self.get_roots():
+                task.set_upstream(task_or_task_list)
+        else:
+            for task in self.get_leaves():
+                task.set_downstream(task_or_task_list)
+
+        # Update upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids
+        # accordingly so that we can reduce the number of edges when displaying Graph View.
+        if isinstance(task_or_task_list, TaskGroup):
+            # Handles TaskGroup and TaskGroup
+            if upstream:
+                parent, child = (self, task_or_task_list)
+            else:
+                parent, child = (task_or_task_list, self)
+
+            parent.upstream_group_ids.add(child.group_id)
+            child.downstream_group_ids.add(parent.group_id)
+        else:
+            # Handles TaskGroup and task or list of tasks
+            try:
+                task_list = list(task_or_task_list)  # type: ignore
+            except TypeError:
+                task_list = [task_or_task_list]  # type: ignore
+
+            for task in task_list:
+                if not isinstance(task, BaseOperator):
+                    raise AirflowException("Relationships can only be set between TaskGroup or operators; "
+                                           f"received {task.__class__.__name__}")
+
+                if upstream:
+                    self.upstream_task_ids.add(task.task_id)
+                else:
+                    self.downstream_task_ids.add(task.task_id)
+
+    def set_downstream(
+        self, task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"]
+    ) -> None:
+        """
+        Set a TaskGroup/task/list of task downstream of this TaskGroup.
+        """
+        self._set_relative(task_or_task_list, upstream=False)
+
+    def set_upstream(
+        self, task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"]
+    ) -> None:
+        """
+        Set a TaskGroup/task/list of task upstream of this TaskGroup.
+        """
+        self._set_relative(task_or_task_list, upstream=True)
+
+    def __enter__(self):
+        TaskGroupContext.push_context_managed_task_group(self)
+        return self
+
+    def __exit__(self, _type, _value, _tb):
+        TaskGroupContext.pop_context_managed_task_group()
+
+    def has_task(self, task: "BaseOperator") -> bool:
+        """
+        Returns True if this TaskGroup or its children TaskGroups contains the given task.
+        """
+        if task.task_id in self.children:
+            return True
+
+        return any(child.has_task(task) for child in self.children.values() if isinstance(child, TaskGroup))
+
+    def get_roots(self) -> Generator["BaseOperator", None, None]:
+        """
+        Returns a generator of tasks that are root tasks, i.e. those with no upstream
+        dependencies within the TaskGroup.
+        """
+        for task in self:
+            if not any(self.has_task(parent) for parent in task.get_direct_relatives(upstream=True)):
+                yield task
+
+    def get_leaves(self) -> Generator["BaseOperator", None, None]:
+        """
+        Returns a generator of tasks that are leaf tasks, i.e. those with no downstream
+        dependencies within the TaskGroup
+        """
+        for task in self:
+            if not any(self.has_task(child) for child in task.get_direct_relatives(upstream=False)):
+                yield task
+
+    def __rshift__(self, other):
+        """
+        Implements Self >> Other == self.set_downstream(other)
+        """
+        self.set_downstream(other)
+        return other
+
+    def __lshift__(self, other):
+        """
+        Implements Self << Other == self.set_upstream(other)
+        """
+        self.set_upstream(other)
+        return other
+
+    def __rrshift__(self, other):
+        """
+        Called for Operator >> [Operator] because list don't have
+        __rshift__ operators.
+        """
+        self.__lshift__(other)
+        return self
+
+    def __rlshift__(self, other):
+        """
+        Called for Operator << [Operator] because list don't have
+        __lshift__ operators.
+        """
+        self.__rshift__(other)
+        return self
+
+    def child_id(self, label):
+        """
+        Prefix label with group_id if prefix_group_id is True. Otherwise return the label
+        as-is.
+        """
+        if self.prefix_group_id and self.group_id:
+            return f"{self.group_id}.{label}"
+
+        return label
+
+    @property
+    def upstream_join_id(self):
+        """
+        If this TaskGroup has immediate upstream TaskGroups or tasks, a dummy node called
+        upstream_join_id will be created in Graph View to join the outgoing edges from this
+        TaskGroup to reduce the total number of edges needed to be displayed.
+        """
+        return f"{self.group_id}.upstream_join_id"
+
+    @property
+    def downstream_join_id(self):

Review comment:
       ```suggestion
       def downstream_join_id(self) -> str:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-674357185


   @yuqian90 I think that's a good approach, this extra meta data needs to be added to more than one endpoint i think, but we can start with `/dags/{dag_id}/tasks`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691032987


   > > Fix for static check failure has been merged, can you rebase once more please
   > 
   > Thanks @kaxil i just did another rebase. Re: @turbaszek , i added the following to silence the cyclic import. The "cyclic" import is only used for the type hints. It's unfortunate that pylint does not respect the `if TYPE_CHECKING` condition.
   > 
   > ```
   > if TYPE_CHECKING:
   >     from airflow.utils.task_group import TaskGroup  # pylint: disable=cyclic-import
   > ```
   
   True, related Pylint issues:
   
    * [PyCQA/pylint#3285](https://github.com/PyCQA/pylint/issues/3285)
    * [PyCQA/pylint#3382](https://github.com/PyCQA/pylint/issues/3382)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-690919898






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-690924584






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-681930066


   > Do you mind elaborate a bit more on the impact to TreeView please? Would it be rendered just like the subdag operators? Thanks.
   
   At the current implementation of this PR, the Tree View is not touched. In other words, the grouping is only relevant in Graph View. The Tree View will look like all tasks, including the ones in nested TaskGroups are on the same DAG (because that's what they are under the hood).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-676241782


   > This needs good documentation coverage too
   
   Thanks. I added a section here:
   https://github.com/apache/airflow/blob/545c69704ff7ad199ff58b7a01b3c60fc222b7c2/docs/concepts.rst#taskgroup


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] BhuviTheDataGuy commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
BhuviTheDataGuy commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-708155288


   Its a nice feature any idea when it'll be available(GA release)? 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 closed pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #10153:
URL: https://github.com/apache/airflow/pull/10153


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r472097193



##########
File path: airflow/models/baseoperator.py
##########
@@ -359,9 +363,12 @@ def __init__(
         do_xcom_push: bool = True,
         inlets: Optional[Any] = None,
         outlets: Optional[Any] = None,
+        task_group: Optional["TaskGroup"] = None,
         **kwargs
     ):
         from airflow.models.dag import DagContext
+        from airflow.utils.task_group import TaskGroupContext

Review comment:
       There is not circular import here. However, I thought putting import inline if it's only needed in this function is better because in the future people might need to import BaseOperator inside task_group.py. Also this is put at this line because I saw `DagContext` being imported here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r472499741



##########
File path: airflow/models/baseoperator.py
##########
@@ -359,9 +363,12 @@ def __init__(
         do_xcom_push: bool = True,
         inlets: Optional[Any] = None,
         outlets: Optional[Any] = None,
+        task_group: Optional["TaskGroup"] = None,
         **kwargs
     ):
         from airflow.models.dag import DagContext
+        from airflow.utils.task_group import TaskGroupContext

Review comment:
       There is runtime overhead for every import statement even though the module has already been imported. the performance implication is probably not going to matter too much for this particular case, but in general, it's a good coding practice to keep import at the top of the file unless there is no other choice.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r469280890



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+TaskGroup
+"""
+
+from typing import List, Optional
+
+
+class TaskGroup:
+    """
+    A collection of tasks. Tasks within the TaskGroup have their task_id prefixed with the
+    group_id of the TaskGroup. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+    """
+    def __init__(self, group_id, parent_group=None):
+        if group_id is None:
+            # This creates a root TaskGroup.
+            self.parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+            self.parent_group = parent_group or TaskGroupContext.get_current_task_group()
+
+        self._group_id = group_id
+        if self.parent_group:
+            self.parent_group.add(self)
+        self.children = {}
+
+    @classmethod
+    def create_root(cls):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None)
+
+    @property
+    def is_root(self):
+        """
+        Returns True if this TaskGroup is the root TaskGroup. Otherwise False
+        """
+        return not self.parent_group
+
+    def __iter__(self):
+        for child in self.children.values():
+            if isinstance(child, TaskGroup):
+                for inner_task in child:
+                    yield inner_task
+            else:
+                yield child
+
+    def add(self, task):
+        """
+        Add a task to this TaskGroup.
+        """
+        if task.label in self.children:
+            raise ValueError(f"Duplicate label {task.label} in {self.group_id}")
+        self.children[task.label] = task
+
+    @property
+    def label(self):
+        """
+        group_id excluding parent's group_id.
+        """
+        return self._group_id
+
+    @property
+    def group_id(self):
+        """
+        group_id is prefixed with parent group_id if applicable.
+        """
+        if not self.group_ids:
+            return None
+
+        return ".".join(self.group_ids)
+
+    @property
+    def group_ids(self):
+        """
+        Returns all the group_id of nested TaskGroups as a list, starting from the top.
+        """
+        return list(self._group_ids())[1:]
+
+    def _group_ids(self):
+        if self.parent_group:
+            for group_id in self.parent_group._group_ids():  # pylint: disable=protected-access
+                yield group_id
+
+        yield self._group_id
+
+    def set_downstream(self, task_or_task_list) -> None:
+        """
+        Call set_downstream for all leaf tasks within this TaskGroup.
+        """
+        for task in self.get_leaves():
+            task.set_downstream(task_or_task_list)
+
+    def set_upstream(self, task_or_task_list) -> None:
+        """
+        Call set_upstream for all root tasks within this TaskGroup.
+        """
+        for task in self.get_roots():
+            task.set_upstream(task_or_task_list)
+
+    def __enter__(self):
+        TaskGroupContext.push_context_managed_task_group(self)
+        return self
+
+    def __exit__(self, _type, _value, _tb):
+        TaskGroupContext.pop_context_managed_task_group()
+
+    def get_roots(self):
+        """
+        Returns a generator of tasks that are root tasks, i.e. those with no upstream
+        dependencies within the TaskGroup.
+        """
+        for task in self:
+            if not any(parent.is_in_task_group(self.group_ids)
+                       for parent in task.get_direct_relatives(upstream=True)):
+                yield task
+
+    def get_leaves(self):
+        """
+        Returns a generator of tasks that are leaf tasks, i.e. those with no downstream
+        dependencies within the TaskGroup
+        """
+        for task in self:
+            if not any(child.is_in_task_group(self.group_ids)
+                       for child in task.get_direct_relatives(upstream=False)):
+                yield task
+
+    def __rshift__(self, other):
+        """
+        Implements Self >> Other == self.set_downstream(other)
+        """
+        self.set_downstream(other)
+        return other
+
+    def __lshift__(self, other):
+        """
+        Implements Self << Other == self.set_upstream(other)
+        """
+        self.set_upstream(other)
+        return other
+
+    def __rrshift__(self, other):
+        """
+        Called for Operator >> [Operator] because list don't have
+        __rshift__ operators.
+        """
+        self.__lshift__(other)
+        return self
+
+    def __rlshift__(self, other):
+        """
+        Called for Operator << [Operator] because list don't have
+        __lshift__ operators.
+        """
+        self.__rshift__(other)
+        return self
+
+    @classmethod
+    def build_task_group(cls, tasks):
+        """
+        Put tasks into TaskGroup.
+        """
+        root = TaskGroup.create_root()
+        for task in tasks:
+            current = root
+            for label in task.task_group_ids:
+                if label not in current.children:
+                    child_group = TaskGroup(group_id=label, parent_group=current)
+                else:
+                    child_group = current.children[label]
+                current = child_group
+            current.add(task)
+        return root
+
+
+class TaskGroupContext:
+    """
+    TaskGroup context is used to keep the current TaskGroup when TaskGroup is used as ContextManager.
+    """
+
+    _context_managed_task_group: Optional[TaskGroup] = None
+    _previous_context_managed_task_groups: List[TaskGroup] = []
+
+    @classmethod
+    def push_context_managed_task_group(cls, task_group: TaskGroup):
+        """
+        Push a TaskGroup into the list of managed TaskGroups.
+        """
+        if cls._context_managed_task_group:
+            cls._previous_context_managed_task_groups.append(cls._context_managed_task_group)
+        cls._context_managed_task_group = task_group
+
+    @classmethod
+    def pop_context_managed_task_group(cls) -> Optional[TaskGroup]:
+        """
+        Pops the last TaskGroup from the list of manged TaskGroups and update the current TaskGroup.
+        """
+        old_task_group = cls._context_managed_task_group
+        if cls._previous_context_managed_task_groups:
+            cls._context_managed_task_group = cls._previous_context_managed_task_groups.pop()
+        else:
+            cls._context_managed_task_group = None
+        return old_task_group
+
+    @classmethod
+    def get_current_task_group(cls) -> Optional[TaskGroup]:
+        """
+        Get the current TaskGroup.
+        """
+        if not cls._context_managed_task_group:
+            return TaskGroup.create_root()

Review comment:
       This isn't an issue at the moment because only the `task_group_ids` is important, but you are right this is a potential booby trap for future maintenance. I'll fix this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-690919898






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r468937005



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+TaskGroup
+"""
+
+from typing import List, Optional
+
+
+class TaskGroup:
+    """
+    A collection of tasks. Tasks within the TaskGroup have their task_id prefixed with the
+    group_id of the TaskGroup. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+    """
+    def __init__(self, group_id, parent_group=None):
+        if group_id is None:
+            # This creates a root TaskGroup.
+            self.parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+            self.parent_group = parent_group or TaskGroupContext.get_current_task_group()
+
+        self._group_id = group_id
+        if self.parent_group:
+            self.parent_group.add(self)
+        self.children = {}
+
+    @classmethod
+    def create_root(cls):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None)
+
+    @property
+    def is_root(self):
+        """
+        Returns True if this TaskGroup is the root TaskGroup. Otherwise False
+        """
+        return not self.parent_group
+
+    def __iter__(self):
+        for child in self.children.values():
+            if isinstance(child, TaskGroup):
+                for inner_task in child:
+                    yield inner_task
+            else:
+                yield child
+
+    def add(self, task):
+        """
+        Add a task to this TaskGroup.
+        """
+        if task.label in self.children:
+            raise ValueError(f"Duplicate label {task.label} in {self.group_id}")
+        self.children[task.label] = task
+
+    @property
+    def label(self):
+        """
+        group_id excluding parent's group_id.
+        """
+        return self._group_id
+
+    @property
+    def group_id(self):
+        """
+        group_id is prefixed with parent group_id if applicable.
+        """
+        if not self.group_ids:
+            return None
+
+        return ".".join(self.group_ids)
+
+    @property
+    def group_ids(self):
+        """
+        Returns all the group_id of nested TaskGroups as a list, starting from the top.
+        """
+        return list(self._group_ids())[1:]
+
+    def _group_ids(self):
+        if self.parent_group:
+            for group_id in self.parent_group._group_ids():  # pylint: disable=protected-access
+                yield group_id
+
+        yield self._group_id
+
+    def set_downstream(self, task_or_task_list) -> None:
+        """
+        Call set_downstream for all leaf tasks within this TaskGroup.
+        """
+        for task in self.get_leaves():
+            task.set_downstream(task_or_task_list)
+
+    def set_upstream(self, task_or_task_list) -> None:
+        """
+        Call set_upstream for all root tasks within this TaskGroup.
+        """
+        for task in self.get_roots():
+            task.set_upstream(task_or_task_list)
+
+    def __enter__(self):
+        TaskGroupContext.push_context_managed_task_group(self)
+        return self
+
+    def __exit__(self, _type, _value, _tb):
+        TaskGroupContext.pop_context_managed_task_group()
+
+    def get_roots(self):
+        """
+        Returns a generator of tasks that are root tasks, i.e. those with no upstream
+        dependencies within the TaskGroup.
+        """
+        for task in self:
+            if not any(parent.is_in_task_group(self.group_ids)
+                       for parent in task.get_direct_relatives(upstream=True)):
+                yield task
+
+    def get_leaves(self):
+        """
+        Returns a generator of tasks that are leaf tasks, i.e. those with no downstream
+        dependencies within the TaskGroup
+        """
+        for task in self:
+            if not any(child.is_in_task_group(self.group_ids)
+                       for child in task.get_direct_relatives(upstream=False)):
+                yield task
+
+    def __rshift__(self, other):
+        """
+        Implements Self >> Other == self.set_downstream(other)
+        """
+        self.set_downstream(other)
+        return other
+
+    def __lshift__(self, other):
+        """
+        Implements Self << Other == self.set_upstream(other)
+        """
+        self.set_upstream(other)
+        return other
+
+    def __rrshift__(self, other):
+        """
+        Called for Operator >> [Operator] because list don't have
+        __rshift__ operators.
+        """
+        self.__lshift__(other)
+        return self
+
+    def __rlshift__(self, other):
+        """
+        Called for Operator << [Operator] because list don't have
+        __lshift__ operators.
+        """
+        self.__rshift__(other)
+        return self
+
+    @classmethod
+    def build_task_group(cls, tasks):
+        """
+        Put tasks into TaskGroup.
+        """
+        root = TaskGroup.create_root()
+        for task in tasks:
+            current = root
+            for label in task.task_group_ids:
+                if label not in current.children:
+                    child_group = TaskGroup(group_id=label, parent_group=current)
+                else:
+                    child_group = current.children[label]
+                current = child_group
+            current.add(task)
+        return root
+
+
+class TaskGroupContext:
+    """
+    TaskGroup context is used to keep the current TaskGroup when TaskGroup is used as ContextManager.
+    """
+
+    _context_managed_task_group: Optional[TaskGroup] = None
+    _previous_context_managed_task_groups: List[TaskGroup] = []
+
+    @classmethod
+    def push_context_managed_task_group(cls, task_group: TaskGroup):
+        """
+        Push a TaskGroup into the list of managed TaskGroups.
+        """
+        if cls._context_managed_task_group:
+            cls._previous_context_managed_task_groups.append(cls._context_managed_task_group)
+        cls._context_managed_task_group = task_group
+
+    @classmethod
+    def pop_context_managed_task_group(cls) -> Optional[TaskGroup]:
+        """
+        Pops the last TaskGroup from the list of manged TaskGroups and update the current TaskGroup.
+        """
+        old_task_group = cls._context_managed_task_group
+        if cls._previous_context_managed_task_groups:
+            cls._context_managed_task_group = cls._previous_context_managed_task_groups.pop()
+        else:
+            cls._context_managed_task_group = None
+        return old_task_group
+
+    @classmethod
+    def get_current_task_group(cls) -> Optional[TaskGroup]:
+        """
+        Get the current TaskGroup.
+        """
+        if not cls._context_managed_task_group:
+            return TaskGroup.create_root()

Review comment:
       should a user expect `_context_managed_task_group` to always point to current task group? if so, should we set `_context_managed_task_group` to this newly created task group before returning?

##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+TaskGroup
+"""
+
+from typing import List, Optional
+
+
+class TaskGroup:
+    """
+    A collection of tasks. Tasks within the TaskGroup have their task_id prefixed with the
+    group_id of the TaskGroup. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+    """
+    def __init__(self, group_id, parent_group=None):
+        if group_id is None:
+            # This creates a root TaskGroup.
+            self.parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+            self.parent_group = parent_group or TaskGroupContext.get_current_task_group()
+
+        self._group_id = group_id
+        if self.parent_group:
+            self.parent_group.add(self)
+        self.children = {}
+
+    @classmethod
+    def create_root(cls):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None)
+
+    @property
+    def is_root(self):
+        """
+        Returns True if this TaskGroup is the root TaskGroup. Otherwise False
+        """
+        return not self.parent_group
+
+    def __iter__(self):
+        for child in self.children.values():
+            if isinstance(child, TaskGroup):
+                for inner_task in child:
+                    yield inner_task
+            else:
+                yield child
+
+    def add(self, task):
+        """
+        Add a task to this TaskGroup.
+        """
+        if task.label in self.children:
+            raise ValueError(f"Duplicate label {task.label} in {self.group_id}")
+        self.children[task.label] = task
+
+    @property
+    def label(self):
+        """
+        group_id excluding parent's group_id.
+        """
+        return self._group_id
+
+    @property
+    def group_id(self):
+        """
+        group_id is prefixed with parent group_id if applicable.
+        """
+        if not self.group_ids:
+            return None
+
+        return ".".join(self.group_ids)
+
+    @property
+    def group_ids(self):
+        """
+        Returns all the group_id of nested TaskGroups as a list, starting from the top.
+        """
+        return list(self._group_ids())[1:]
+
+    def _group_ids(self):
+        if self.parent_group:
+            for group_id in self.parent_group._group_ids():  # pylint: disable=protected-access
+                yield group_id
+
+        yield self._group_id
+
+    def set_downstream(self, task_or_task_list) -> None:

Review comment:
       nit, could you add type hints to methods in this class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691054773


   Yeah. This is unfortunate, but I think not for the "TYPE_CHECKING" problem but because of internal architecture of our "core" which is overcomplicated. Unfortunately, we still have those cyclic dependencies (not cyclic imports really - the name in Pylint is a bit misleading) we have in the project. 
   
   IMHO,  while "technically" those cyclic imports are not triggered in our service (because of the sequence in which classes are usually imported when airflow starts), they make the whole "core" part of airflow much more complex than it should be and difficult to reason about (and difficult to static check).  The single-responsibility principle is not followed still in quite a few classes and it's actually quite difficult to disentangle those (I tried several times and others two and so far it causes a chain reaction of changes that makes it rather difficult.
   
   I am going to make some more attempts looking at that (hopefully before 2.0 or while preparing some of the release candidates).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 closed pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #10153:
URL: https://github.com/apache/airflow/pull/10153






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691054773






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 edited a comment on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 edited a comment on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-681930066


   > Do you mind elaborate a bit more on the impact to TreeView please? Would it be rendered just like the subdag operators? Thanks.
   
   At the current implementation of this PR, the Tree View is not touched. In other words, the grouping is only relevant in Graph View. The Tree View will look like all tasks, including the ones in nested TaskGroups are on the same DAG (because that's what they are under the hood).
   
   That said, we can always improve on it and create the grouping in Tree View too.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r478394967



##########
File path: airflow/models/baseoperator.py
##########
@@ -382,7 +389,16 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self.label = task_id
+
+        # Prefix task_id with group_id
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        if task_group:
+            self.task_id = f"{task_group.group_id}.{self.label}" if task_group.group_id else self.label

Review comment:
       Hi @houqp, even without TaskGroup, users cannot define tasks with conflicting names in the same DAG. So if we simply remove the notion of `label` in this PR, we can achieve that. That means the label in Graph View will be the same as `task_id`. And users have to do their own work to maintain unique `task_id` across the entire DAG, and across TaskGroups.
   
   The same example will look like this. And the `task_id` kept in `dag.task_dict` and the labels shown on the Graph View both look like this:
   ```
   section_1_task_1
   section_1_task_2
   section_2_task_1
   section_2_task_2
   ```
   
   ```
   def create_section(group_id):
       task1 = DummyOperator(task_id=f"{group_id}_task_1")
       task2 = DummyOperator(task_id=f"{group_id}_task_2")
   
   with DAG(...) as dag:
       with TaskGroup("section1") as section1:
           create_section(section1.group_id)
   
       with TaskGroup("section2") as section2:
           create_section(section2.group_id)
   ```
   
   How does it sound?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #10153:
URL: https://github.com/apache/airflow/pull/10153


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691028874


   > Fix for static check failure has been merged, can you rebase once more please
   
   Thanks @kaxil  i just did another rebase. Re: @turbaszek , i added the following to silence the cyclic import. The "cyclic" import is only used for the type hints. It's unfortunate that pylint does not respect the `if TYPE_CHECKING` condition.
   
   ```
   if TYPE_CHECKING:
       from airflow.utils.task_group import TaskGroup  # pylint: disable=cyclic-import
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-675145414


   Thanks @yuqian90 for the update, I left one comment around a change that I think we should be more careful about, the rest looks good to me 👍 would love to hear about other people's opinions on this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r479914159



##########
File path: airflow/models/baseoperator.py
##########
@@ -382,7 +389,16 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self.label = task_id
+
+        # Prefix task_id with group_id
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        if task_group:
+            self.task_id = f"{task_group.group_id}.{self.label}" if task_group.group_id else self.label

Review comment:
       > So if we simply remove the notion of label in this PR, we can achieve that. That means the label in Graph View will be the same as task_id. And users have to do their own work to maintain unique task_id across the entire DAG, and across TaskGroups.
   
   This is what I have in mind as well since we already require users to make sure task_ids are unique today. I previous comment as asking whether there is some sort of special requirement in subdag support that requires us to support task namespacing within the group. Based on your answer, it looks like there isn't.
   
   I would prefer we go with the simpler route, i.e. dropping label, if it meets all our existing requirements. So far, it maintains task_id semantics and comes with lower code complexity, so it's a win win to me :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r489949739



##########
File path: airflow/models/baseoperator.py
##########
@@ -1120,21 +1132,25 @@ def roots(self) -> List["BaseOperator"]:
         """Required by TaskMixin"""
         return [self]
 
+    @property
+    def leaves(self) -> List["BaseOperator"]:
+        """Required by TaskMixin"""
+        return [self]
+
     def _set_relatives(
         self,
         task_or_task_list: Union[TaskMixin, Sequence[TaskMixin]],
         upstream: bool = False,
     ) -> None:
         """Sets relatives for the task or task list."""
-
-        if isinstance(task_or_task_list, Sequence):
-            task_like_object_list = task_or_task_list
-        else:
-            task_like_object_list = [task_or_task_list]
+        if not isinstance(task_or_task_list, Sequence):
+            task_or_task_list = [task_or_task_list]
 
         task_list: List["BaseOperator"] = []
-        for task_object in task_like_object_list:
-            task_list.extend(task_object.roots)
+        for task_object in task_or_task_list:
+            task_object.update_relative(self, not upstream)

Review comment:
       Hi @turbaszek I added a `update_relative` method to `TaskMixin` that defaults to `no-op`. It's called here. `TaskGroup` overrides this method in order to keep track of its direct upstream/downstream `TaskGroup` or `BaseOperator` for `UI` optimization. `BaseOperator` and `XComArg` don't need to override it.
   
   The following is the `roots` vs `leaves` distinction that I mentioned on the `TaskMixin` PR. I use it here so that we don't need to hardcode another `if isinstace(task_object, TaskGroup)` here. Please see if this looks okay to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-695161125


   @yuqian90 Heck yes!!! I'm so excited for this 🙌 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 closed pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #10153:
URL: https://github.com/apache/airflow/pull/10153


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 closed pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #10153:
URL: https://github.com/apache/airflow/pull/10153


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 closed pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #10153:
URL: https://github.com/apache/airflow/pull/10153


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484262687



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -625,3 +647,69 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
         if ver != cls.SERIALIZER_VERSION:
             raise ValueError("Unsure how to deserialize version {!r}".format(ver))
         return cls.deserialize_dag(serialized_obj['dag'])
+
+
+class SerializedTaskGroup(TaskGroup, BaseSerialization):
+    """
+    A JSON serializable representation of TaskGroup.
+    """
+    @classmethod
+    def serialize_task_group(cls, task_group: TaskGroup) -> Optional[Union[Dict[str, Any]]]:
+        """Serializes TaskGroup into a JSON object.
+        """
+        if not task_group:
+            return None
+
+        serialize_group = {
+            "_group_id": task_group._group_id,  # pylint: disable=protected-access
+            "prefix_group_id": task_group.prefix_group_id,
+            "tooltip": task_group.tooltip,
+            "ui_color": task_group.ui_color,
+            "ui_fgcolor": task_group.ui_fgcolor,
+            "children": {
+                label: (DAT.OP, child.task_id)
+                if isinstance(child, BaseOperator) else
+                (DAT.TASK_GROUP, SerializedTaskGroup.serialize_task_group(child))
+                for label, child in task_group.children.items()
+            },
+            "upstream_group_ids": cls._serialize(list(task_group.upstream_group_ids)),
+            "downstream_group_ids": cls._serialize(list(task_group.downstream_group_ids)),
+            "upstream_task_ids": cls._serialize(list(task_group.upstream_task_ids)),
+            "downstream_task_ids": cls._serialize(list(task_group.downstream_task_ids)),
+
+        }
+
+        return serialize_group
+
+    @classmethod
+    def deserialize_task_group(
+        cls,
+        encoded_group: Dict[str, Any],
+        parent_group: Optional[TaskGroup],
+        task_dict: Dict[str, BaseOperator]
+    ) -> Optional[TaskGroup]:
+        """Deserializes a TaskGroup from a JSON object.

Review comment:
       ```suggestion
           """
           Deserializes a TaskGroup from a JSON object.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-687473532


   @yuqian90 This is AWESOME! Thank you for your work on this. I'll leave the review to @kaxil as he knows more about the front-end, but super excited for this!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-675145311


   Thanks for the PR, I will definitely take a look at it since Airflow 1.10.12 is released :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691026924






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-693270409


   @kaxil @potiuk @houqp can you please take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r479937558



##########
File path: airflow/models/baseoperator.py
##########
@@ -382,7 +389,16 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self.label = task_id
+
+        # Prefix task_id with group_id
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        if task_group:
+            self.task_id = f"{task_group.group_id}.{self.label}" if task_group.group_id else self.label

Review comment:
       Actually I think you have a really good point. In fact, after trying this out over some real existing DAGs, I realized leaving `task_id` fully to the user is better. For existing DAGs, when people want to convert to use TaskGroups, they don't want to modify the `task_id` (because doing so causes the tasks to appear as new tasks in the DAG).
   
   So I took your suggestion:
   - Removed `label`
   - Do not prefix `task_id` with group_id
   - Require `group_id` and `task_id` to be unique throughout the whole DAG




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484261439



##########
File path: airflow/models/baseoperator.py
##########
@@ -57,6 +58,9 @@
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.weight_rule import WeightRule
 
+if TYPE_CHECKING:
+    from airflow.utils.task_group import TaskGroup  # pylint: disable=cyclic-import

Review comment:
       Does `TaskGroup` create a cyclic dependency? We have already a few of them and if it's possible to avoid another one we should try to do this 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-686496879


   > > For tree view, can we not hide the task group and when clicking on the node it expands (I think we can do so now too right? If we click on a node it collapses)
   > 
   > Hi, @kaxil yes you are right we can potentially organize Tree View by TaskGroup too. The challenge there is that Tree View flattens the graph into a tree. Therefore each node appears more than once. I have some problem imagining how the collapsible groups should look like in the tree.
   > 
   > That said, I'm less concerned about Tree View because it's already a collapsible tree. The tree is organised by dependency instead of by TaskGroup. How about we incrementally build on this in a few PRs ? First introduce TaskGroup and make it working in Graph View. And then have people agree on how the Tree View should look like with TaskGroup in mind, and then we put up a PR to incorporate TaskGroup into Tree View?
   
   Yeah I am definitely ok with that being a separate PR. Btw thanks for the awesome work. I will definitely do a more detailed review but on a higher level this looks good.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484260857



##########
File path: airflow/example_dags/example_task_group.py
##########
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the TaskGroup."""
+
+from airflow.models.dag import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.task_group import TaskGroup
+
+
+# [START howto_task_group]
+def create_section():
+    """
+    Create tasks in the outer section.
+    """
+    dummies = [DummyOperator(task_id=f'task-{i + 1}') for i in range(5)]
+
+    with TaskGroup("inside_section_1") as inside_section_1:
+        _ = [DummyOperator(task_id=f'task-{i + 1}',) for i in range(3)]
+
+    with TaskGroup("inside_section_2") as inside_section_2:
+        _ = [DummyOperator(task_id=f'task-{i + 1}',) for i in range(3)]
+
+    dummies[-1] >> inside_section_1
+    dummies[-2] >> inside_section_2
+    inside_section_1 >> inside_section_2
+
+
+with DAG(dag_id="example_task_group", start_date=days_ago(2)) as dag:
+    start = DummyOperator(task_id="start")
+
+    with TaskGroup("section_1", tooltip="Tasks for Section 1") as section_1:
+        create_section()
+
+    some_other_task = DummyOperator(task_id="some-other-task")
+
+    with TaskGroup("section_2", tooltip="Tasks for Section 2") as section_2:
+        create_section()
+
+    end = DummyOperator(task_id='end')
+
+    start >> section_1 >> some_other_task >> section_2 >> end
+# [END howto_task_group]

Review comment:
       It took me a while to understand this example. I would suggest to simplify it a bit by
   - avoiding `create_section` function and creating tasks explicitly 
   - adding START/END for single `TaskGroup` section so users can grasp this as a single idea
   - adding START/END for `TaskGroup` within `TaskGroup` to demonstrate nesting 
   
   Eventually, we may add another, simpler DAG next to this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r471121724



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,233 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+TaskGroup
+"""
+
+from typing import List, Optional
+
+
+class TaskGroup:
+    """
+    A collection of tasks. Tasks within the TaskGroup have their task_id prefixed with the
+    group_id of the TaskGroup. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+    """
+    def __init__(self, group_id, parent_group=None):
+        if group_id is None:
+            # This creates a root TaskGroup.
+            self.parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+            self.parent_group = parent_group or TaskGroupContext.get_current_task_group()
+
+        self._group_id = group_id
+        if self.parent_group:
+            self.parent_group.add(self)
+        self.children = {}
+
+    @classmethod
+    def create_root(cls):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None)
+
+    @property
+    def is_root(self):
+        """
+        Returns True if this TaskGroup is the root TaskGroup. Otherwise False
+        """
+        return not self.parent_group
+
+    def __iter__(self):
+        for child in self.children.values():
+            if isinstance(child, TaskGroup):
+                for inner_task in child:
+                    yield inner_task
+            else:
+                yield child
+
+    def add(self, task):
+        """
+        Add a task to this TaskGroup.
+        """
+        if task.label in self.children:
+            raise ValueError(f"Duplicate label {task.label} in {self.group_id}")
+        self.children[task.label] = task
+
+    @property
+    def label(self):
+        """
+        group_id excluding parent's group_id.
+        """
+        return self._group_id
+
+    @property
+    def group_id(self):
+        """
+        group_id is prefixed with parent group_id if applicable.
+        """
+        if not self.group_ids:
+            return None
+
+        return ".".join(self.group_ids)
+
+    @property
+    def group_ids(self):
+        """
+        Returns all the group_id of nested TaskGroups as a list, starting from the top.
+        """
+        return list(self._group_ids())[1:]
+
+    def _group_ids(self):
+        if self.parent_group:
+            for group_id in self.parent_group._group_ids():  # pylint: disable=protected-access
+                yield group_id
+
+        yield self._group_id
+
+    def set_downstream(self, task_or_task_list) -> None:
+        """
+        Call set_downstream for all leaf tasks within this TaskGroup.
+        """
+        for task in self.get_leaves():
+            task.set_downstream(task_or_task_list)
+
+    def set_upstream(self, task_or_task_list) -> None:
+        """
+        Call set_upstream for all root tasks within this TaskGroup.
+        """
+        for task in self.get_roots():
+            task.set_upstream(task_or_task_list)
+
+    def __enter__(self):
+        TaskGroupContext.push_context_managed_task_group(self)
+        return self
+
+    def __exit__(self, _type, _value, _tb):
+        TaskGroupContext.pop_context_managed_task_group()
+
+    def get_roots(self):
+        """
+        Returns a generator of tasks that are root tasks, i.e. those with no upstream
+        dependencies within the TaskGroup.
+        """
+        for task in self:
+            if not any(parent.is_in_task_group(self.group_ids)
+                       for parent in task.get_direct_relatives(upstream=True)):
+                yield task
+
+    def get_leaves(self):
+        """
+        Returns a generator of tasks that are leaf tasks, i.e. those with no downstream
+        dependencies within the TaskGroup
+        """
+        for task in self:
+            if not any(child.is_in_task_group(self.group_ids)
+                       for child in task.get_direct_relatives(upstream=False)):
+                yield task
+
+    def __rshift__(self, other):
+        """
+        Implements Self >> Other == self.set_downstream(other)
+        """
+        self.set_downstream(other)
+        return other
+
+    def __lshift__(self, other):
+        """
+        Implements Self << Other == self.set_upstream(other)
+        """
+        self.set_upstream(other)
+        return other
+
+    def __rrshift__(self, other):
+        """
+        Called for Operator >> [Operator] because list don't have
+        __rshift__ operators.
+        """
+        self.__lshift__(other)
+        return self
+
+    def __rlshift__(self, other):
+        """
+        Called for Operator << [Operator] because list don't have
+        __lshift__ operators.
+        """
+        self.__rshift__(other)
+        return self
+
+    @classmethod
+    def build_task_group(cls, tasks):
+        """
+        Put tasks into TaskGroup.
+        """
+        root = TaskGroup.create_root()
+        for task in tasks:
+            current = root
+            for label in task.task_group_ids:
+                if label not in current.children:
+                    child_group = TaskGroup(group_id=label, parent_group=current)
+                else:
+                    child_group = current.children[label]
+                current = child_group
+            current.add(task)
+        return root
+
+
+class TaskGroupContext:
+    """
+    TaskGroup context is used to keep the current TaskGroup when TaskGroup is used as ContextManager.
+    """
+
+    _context_managed_task_group: Optional[TaskGroup] = None
+    _previous_context_managed_task_groups: List[TaskGroup] = []
+
+    @classmethod
+    def push_context_managed_task_group(cls, task_group: TaskGroup):
+        """
+        Push a TaskGroup into the list of managed TaskGroups.
+        """
+        if cls._context_managed_task_group:
+            cls._previous_context_managed_task_groups.append(cls._context_managed_task_group)
+        cls._context_managed_task_group = task_group
+
+    @classmethod
+    def pop_context_managed_task_group(cls) -> Optional[TaskGroup]:
+        """
+        Pops the last TaskGroup from the list of manged TaskGroups and update the current TaskGroup.
+        """
+        old_task_group = cls._context_managed_task_group
+        if cls._previous_context_managed_task_groups:
+            cls._context_managed_task_group = cls._previous_context_managed_task_groups.pop()
+        else:
+            cls._context_managed_task_group = None
+        return old_task_group
+
+    @classmethod
+    def get_current_task_group(cls) -> Optional[TaskGroup]:
+        """
+        Get the current TaskGroup.
+        """
+        if not cls._context_managed_task_group:
+            return TaskGroup.create_root()

Review comment:
       Made the behaviour like this: If there's currently a DAG but no TaskGroup, return the root TaskGroup of the dag.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691054773






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691054773


   Yeah. This is unfortunate, but I think not for the "TYPE_CHECKING" problem but because of internal architecture of our "core" which is overcomplicated. Unfortunately, we still have those cyclic dependencies (not cyclic imports really - the name in Pylint is a bit misleading) we have in the project. 
   
   IMHO,  while "technically" those cyclic imports are not triggered in our service (because of the sequence in which classes are usually imported when airflow starts), they make the whole "core" part of airflow much more complex than it should be and difficult to reason about (and difficult to static check).  The single-responsibility principle is not followed still in quite a few classes and it's actually quite difficult to disentangle those. I tried several times and others too and so far it causes a chain reaction of changes that makes it rather difficult to accomplish.
   
   I am going to make some more attempts looking at that (hopefully before 2.0 or while preparing some of the release candidates).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-690919898


   > Great work @yuqian90 👏
   
   Thanks @turbaszek .
   
   I looked into the most recent check failures after I rebased upstream, I don't think they are related to my change. I'll see if others report the same issue and wait for a fix upstream.
   
   ```
   airflow/models/dagrun.py:72: error: unused 'type: ignore' comment
               primaryjoin=and_(TI.dag_id == dag_id, TI.execution_date == exe...
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-690924584


   > > Great work @yuqian90 👏
   > 
   > Thanks @turbaszek .
   > 
   > I looked into the most recent check failures after I rebased upstream, I don't think they are related to my change. I'll see if others report the same issue and wait for a fix upstream.
   > 
   > ```
   > airflow/models/dagrun.py:72: error: unused 'type: ignore' comment
   >             primaryjoin=and_(TI.dag_id == dag_id, TI.execution_date == exe...
   > ```
   
   Indeed it seems to be unrelated. @mik-laj any ideas?
   
   Btw. there's also more severe problem:
   ```
   tests/utils/perf/dags/perf_dag_2.py:1:0: R0401: Cyclic import (airflow.models.baseoperator -> airflow.models.xcom_arg -> airflow.utils.task_group) (cyclic-import)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484263443



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,392 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+A TaskGroup is a collection of closely related tasks on the same DAG that should be grouped
+together when the DAG is displayed graphically.
+"""
+
+from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException, DuplicateTaskIdFound
+
+if TYPE_CHECKING:
+    from airflow.models.baseoperator import BaseOperator
+    from airflow.models.dag import DAG
+
+
+class TaskGroup:
+    """
+    A collection of tasks. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+
+    :param group_id: a unique, meaningful id for the TaskGroup. group_id must not conflict
+        with group_id of TaskGroup or task_id of tasks in the DAG. Root TaskGroup has group_id
+        set to None.
+    :type group_id: str
+    :param prefix_group_id: If set to True, child task_id and group_id will be prefixed with
+        this TaskGroup's group_id. If set to False, child task_id and group_id are not prefixed.
+        Default is True.
+    :type prerfix_group_id: bool
+    :param parent_group: The parent TaskGroup of this TaskGroup. parent_group is set to None
+        for the root TaskGroup.
+    :type parent_group: TaskGroup
+    :param dag: The DAG that this TaskGroup belongs to.
+    :type dag: airflow.models.DAG
+    :param tooltip: The tooltip of the TaskGroup node when displayed in the UI
+    :type tooltip: str
+    :param ui_color: The fill color of the TaskGroup node when displayed in the UI
+    :type ui_color: str
+    :param ui_fgcolor: The label color of the TaskGroup node when displayed in the UI
+    :type ui_fgcolor: str
+    """
+
+    def __init__(
+        self,
+        group_id: Optional[str],
+        prefix_group_id: bool = True,
+        parent_group: Optional["TaskGroup"] = None,
+        dag: Optional["DAG"] = None,
+        tooltip: str = "",
+        ui_color: str = "CornflowerBlue",
+        ui_fgcolor: str = "#000",
+    ):
+        from airflow.models.dag import DagContext
+
+        self.prefix_group_id = prefix_group_id
+
+        if group_id is None:
+            # This creates a root TaskGroup.
+            if parent_group:
+                raise AirflowException("Root TaskGroup cannot have parent_group")
+            # used_group_ids is shared across all TaskGroups in the same DAG to keep track
+            # of used group_id to avoid duplication.
+            self.used_group_ids: Set[Optional[str]] = set()
+            self._parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+
+            dag = dag or DagContext.get_current_dag()
+
+            if not parent_group and not dag:
+                raise AirflowException("TaskGroup can only be used inside a dag")
+
+            self._parent_group = parent_group or TaskGroupContext.get_current_task_group(dag)
+            if not self._parent_group:
+                raise AirflowException("TaskGroup must have a parent_group except for the root TaskGroup")
+            self.used_group_ids = self._parent_group.used_group_ids
+
+        self._group_id = group_id
+        if self.group_id in self.used_group_ids:
+            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        self.used_group_ids.add(self.group_id)
+        self.used_group_ids.add(self.downstream_join_id)
+        self.used_group_ids.add(self.upstream_join_id)
+        self.children: Dict[str, Union["BaseOperator", "TaskGroup"]] = {}
+        if self._parent_group:
+            self._parent_group.add(self)
+
+        self.tooltip = tooltip
+        self.ui_color = ui_color
+        self.ui_fgcolor = ui_fgcolor
+
+        # Keep track of TaskGroups or tasks that depend on this entire TaskGroup separately
+        # so that we can optimize the number of edges when entire TaskGroups depend on each other.
+        self.upstream_group_ids: Set[Optional[str]] = set()
+        self.downstream_group_ids: Set[Optional[str]] = set()
+        self.upstream_task_ids: Set[Optional[str]] = set()
+        self.downstream_task_ids: Set[Optional[str]] = set()
+
+    @classmethod
+    def create_root(cls, dag: "DAG"):

Review comment:
       ```suggestion
       def create_root(cls, dag: "DAG") -> "TaskGroup":
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484263191



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,392 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+A TaskGroup is a collection of closely related tasks on the same DAG that should be grouped
+together when the DAG is displayed graphically.
+"""
+
+from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException, DuplicateTaskIdFound
+
+if TYPE_CHECKING:
+    from airflow.models.baseoperator import BaseOperator
+    from airflow.models.dag import DAG
+
+
+class TaskGroup:
+    """
+    A collection of tasks. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+
+    :param group_id: a unique, meaningful id for the TaskGroup. group_id must not conflict
+        with group_id of TaskGroup or task_id of tasks in the DAG. Root TaskGroup has group_id
+        set to None.
+    :type group_id: str
+    :param prefix_group_id: If set to True, child task_id and group_id will be prefixed with
+        this TaskGroup's group_id. If set to False, child task_id and group_id are not prefixed.
+        Default is True.
+    :type prerfix_group_id: bool
+    :param parent_group: The parent TaskGroup of this TaskGroup. parent_group is set to None
+        for the root TaskGroup.
+    :type parent_group: TaskGroup
+    :param dag: The DAG that this TaskGroup belongs to.
+    :type dag: airflow.models.DAG
+    :param tooltip: The tooltip of the TaskGroup node when displayed in the UI
+    :type tooltip: str
+    :param ui_color: The fill color of the TaskGroup node when displayed in the UI
+    :type ui_color: str
+    :param ui_fgcolor: The label color of the TaskGroup node when displayed in the UI
+    :type ui_fgcolor: str
+    """
+
+    def __init__(
+        self,
+        group_id: Optional[str],
+        prefix_group_id: bool = True,
+        parent_group: Optional["TaskGroup"] = None,
+        dag: Optional["DAG"] = None,
+        tooltip: str = "",
+        ui_color: str = "CornflowerBlue",
+        ui_fgcolor: str = "#000",
+    ):
+        from airflow.models.dag import DagContext
+
+        self.prefix_group_id = prefix_group_id
+
+        if group_id is None:
+            # This creates a root TaskGroup.
+            if parent_group:
+                raise AirflowException("Root TaskGroup cannot have parent_group")
+            # used_group_ids is shared across all TaskGroups in the same DAG to keep track
+            # of used group_id to avoid duplication.
+            self.used_group_ids: Set[Optional[str]] = set()
+            self._parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+
+            dag = dag or DagContext.get_current_dag()
+
+            if not parent_group and not dag:
+                raise AirflowException("TaskGroup can only be used inside a dag")
+
+            self._parent_group = parent_group or TaskGroupContext.get_current_task_group(dag)
+            if not self._parent_group:
+                raise AirflowException("TaskGroup must have a parent_group except for the root TaskGroup")
+            self.used_group_ids = self._parent_group.used_group_ids
+
+        self._group_id = group_id
+        if self.group_id in self.used_group_ids:
+            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        self.used_group_ids.add(self.group_id)
+        self.used_group_ids.add(self.downstream_join_id)
+        self.used_group_ids.add(self.upstream_join_id)
+        self.children: Dict[str, Union["BaseOperator", "TaskGroup"]] = {}
+        if self._parent_group:
+            self._parent_group.add(self)
+
+        self.tooltip = tooltip
+        self.ui_color = ui_color
+        self.ui_fgcolor = ui_fgcolor
+
+        # Keep track of TaskGroups or tasks that depend on this entire TaskGroup separately
+        # so that we can optimize the number of edges when entire TaskGroups depend on each other.
+        self.upstream_group_ids: Set[Optional[str]] = set()
+        self.downstream_group_ids: Set[Optional[str]] = set()
+        self.upstream_task_ids: Set[Optional[str]] = set()
+        self.downstream_task_ids: Set[Optional[str]] = set()
+
+    @classmethod
+    def create_root(cls, dag: "DAG"):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None, dag=dag)
+
+    @property
+    def is_root(self):

Review comment:
       ```suggestion
       def is_root(self) -> bool:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484264466



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,392 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+A TaskGroup is a collection of closely related tasks on the same DAG that should be grouped
+together when the DAG is displayed graphically.
+"""
+
+from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException, DuplicateTaskIdFound
+
+if TYPE_CHECKING:
+    from airflow.models.baseoperator import BaseOperator
+    from airflow.models.dag import DAG
+
+
+class TaskGroup:
+    """
+    A collection of tasks. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+
+    :param group_id: a unique, meaningful id for the TaskGroup. group_id must not conflict
+        with group_id of TaskGroup or task_id of tasks in the DAG. Root TaskGroup has group_id
+        set to None.
+    :type group_id: str
+    :param prefix_group_id: If set to True, child task_id and group_id will be prefixed with
+        this TaskGroup's group_id. If set to False, child task_id and group_id are not prefixed.
+        Default is True.
+    :type prerfix_group_id: bool
+    :param parent_group: The parent TaskGroup of this TaskGroup. parent_group is set to None
+        for the root TaskGroup.
+    :type parent_group: TaskGroup
+    :param dag: The DAG that this TaskGroup belongs to.
+    :type dag: airflow.models.DAG
+    :param tooltip: The tooltip of the TaskGroup node when displayed in the UI
+    :type tooltip: str
+    :param ui_color: The fill color of the TaskGroup node when displayed in the UI
+    :type ui_color: str
+    :param ui_fgcolor: The label color of the TaskGroup node when displayed in the UI
+    :type ui_fgcolor: str
+    """
+
+    def __init__(
+        self,
+        group_id: Optional[str],
+        prefix_group_id: bool = True,
+        parent_group: Optional["TaskGroup"] = None,
+        dag: Optional["DAG"] = None,
+        tooltip: str = "",
+        ui_color: str = "CornflowerBlue",
+        ui_fgcolor: str = "#000",
+    ):
+        from airflow.models.dag import DagContext
+
+        self.prefix_group_id = prefix_group_id
+
+        if group_id is None:
+            # This creates a root TaskGroup.
+            if parent_group:
+                raise AirflowException("Root TaskGroup cannot have parent_group")
+            # used_group_ids is shared across all TaskGroups in the same DAG to keep track
+            # of used group_id to avoid duplication.
+            self.used_group_ids: Set[Optional[str]] = set()
+            self._parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+
+            dag = dag or DagContext.get_current_dag()
+
+            if not parent_group and not dag:
+                raise AirflowException("TaskGroup can only be used inside a dag")
+
+            self._parent_group = parent_group or TaskGroupContext.get_current_task_group(dag)
+            if not self._parent_group:
+                raise AirflowException("TaskGroup must have a parent_group except for the root TaskGroup")
+            self.used_group_ids = self._parent_group.used_group_ids
+
+        self._group_id = group_id
+        if self.group_id in self.used_group_ids:
+            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        self.used_group_ids.add(self.group_id)
+        self.used_group_ids.add(self.downstream_join_id)
+        self.used_group_ids.add(self.upstream_join_id)
+        self.children: Dict[str, Union["BaseOperator", "TaskGroup"]] = {}
+        if self._parent_group:
+            self._parent_group.add(self)
+
+        self.tooltip = tooltip
+        self.ui_color = ui_color
+        self.ui_fgcolor = ui_fgcolor
+
+        # Keep track of TaskGroups or tasks that depend on this entire TaskGroup separately
+        # so that we can optimize the number of edges when entire TaskGroups depend on each other.
+        self.upstream_group_ids: Set[Optional[str]] = set()
+        self.downstream_group_ids: Set[Optional[str]] = set()
+        self.upstream_task_ids: Set[Optional[str]] = set()
+        self.downstream_task_ids: Set[Optional[str]] = set()
+
+    @classmethod
+    def create_root(cls, dag: "DAG"):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None, dag=dag)
+
+    @property
+    def is_root(self):
+        """
+        Returns True if this TaskGroup is the root TaskGroup. Otherwise False
+        """
+        return not self.group_id
+
+    def __iter__(self):
+        for child in self.children.values():
+            if isinstance(child, TaskGroup):
+                for inner_task in child:
+                    yield inner_task
+            else:
+                yield child
+
+    def add(self, task: Union["BaseOperator", "TaskGroup"]) -> None:
+        """
+        Add a task to this TaskGroup.
+        """
+        key = task.group_id if isinstance(task, TaskGroup) else task.task_id
+
+        if key in self.children:
+            raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
+
+        if isinstance(task, TaskGroup):
+            if task.children:
+                raise AirflowException("Cannot add a non-empty TaskGroup")
+
+        self.children[key] = task  # type: ignore
+
+    @property
+    def group_id(self) -> Optional[str]:
+        """
+        group_id of this TaskGroup.
+        """
+        if self._parent_group and self._parent_group.prefix_group_id and self._parent_group.group_id:
+            return self._parent_group.child_id(self._group_id)
+
+        return self._group_id
+
+    @property
+    def label(self):
+        """
+        group_id excluding parent's group_id used as the node label in UI.
+        """
+        return self._group_id
+
+    def _set_relative(
+            self,
+            task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"],
+            upstream: bool = False
+    ) -> None:
+        """
+        Call set_upstream/set_downstream for all root/leaf tasks within this TaskGroup.
+        Update upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids.
+        """
+        from airflow.models.baseoperator import BaseOperator
+
+        if upstream:
+            for task in self.get_roots():
+                task.set_upstream(task_or_task_list)
+        else:
+            for task in self.get_leaves():
+                task.set_downstream(task_or_task_list)
+
+        # Update upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids
+        # accordingly so that we can reduce the number of edges when displaying Graph View.
+        if isinstance(task_or_task_list, TaskGroup):
+            # Handles TaskGroup and TaskGroup
+            if upstream:
+                parent, child = (self, task_or_task_list)
+            else:
+                parent, child = (task_or_task_list, self)
+
+            parent.upstream_group_ids.add(child.group_id)
+            child.downstream_group_ids.add(parent.group_id)
+        else:
+            # Handles TaskGroup and task or list of tasks
+            try:
+                task_list = list(task_or_task_list)  # type: ignore
+            except TypeError:
+                task_list = [task_or_task_list]  # type: ignore
+
+            for task in task_list:
+                if not isinstance(task, BaseOperator):
+                    raise AirflowException("Relationships can only be set between TaskGroup or operators; "
+                                           f"received {task.__class__.__name__}")
+
+                if upstream:
+                    self.upstream_task_ids.add(task.task_id)
+                else:
+                    self.downstream_task_ids.add(task.task_id)
+
+    def set_downstream(
+        self, task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"]
+    ) -> None:
+        """
+        Set a TaskGroup/task/list of task downstream of this TaskGroup.
+        """
+        self._set_relative(task_or_task_list, upstream=False)
+
+    def set_upstream(
+        self, task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"]
+    ) -> None:
+        """
+        Set a TaskGroup/task/list of task upstream of this TaskGroup.
+        """
+        self._set_relative(task_or_task_list, upstream=True)
+
+    def __enter__(self):
+        TaskGroupContext.push_context_managed_task_group(self)
+        return self
+
+    def __exit__(self, _type, _value, _tb):
+        TaskGroupContext.pop_context_managed_task_group()
+
+    def has_task(self, task: "BaseOperator") -> bool:
+        """
+        Returns True if this TaskGroup or its children TaskGroups contains the given task.
+        """
+        if task.task_id in self.children:
+            return True
+
+        return any(child.has_task(task) for child in self.children.values() if isinstance(child, TaskGroup))
+
+    def get_roots(self) -> Generator["BaseOperator", None, None]:
+        """
+        Returns a generator of tasks that are root tasks, i.e. those with no upstream
+        dependencies within the TaskGroup.
+        """
+        for task in self:
+            if not any(self.has_task(parent) for parent in task.get_direct_relatives(upstream=True)):
+                yield task
+
+    def get_leaves(self) -> Generator["BaseOperator", None, None]:
+        """
+        Returns a generator of tasks that are leaf tasks, i.e. those with no downstream
+        dependencies within the TaskGroup
+        """
+        for task in self:
+            if not any(self.has_task(child) for child in task.get_direct_relatives(upstream=False)):
+                yield task
+
+    def __rshift__(self, other):
+        """
+        Implements Self >> Other == self.set_downstream(other)
+        """
+        self.set_downstream(other)
+        return other
+
+    def __lshift__(self, other):
+        """
+        Implements Self << Other == self.set_upstream(other)
+        """
+        self.set_upstream(other)
+        return other
+
+    def __rrshift__(self, other):
+        """
+        Called for Operator >> [Operator] because list don't have
+        __rshift__ operators.
+        """
+        self.__lshift__(other)
+        return self
+
+    def __rlshift__(self, other):
+        """
+        Called for Operator << [Operator] because list don't have
+        __lshift__ operators.
+        """
+        self.__rshift__(other)
+        return self
+
+    def child_id(self, label):
+        """
+        Prefix label with group_id if prefix_group_id is True. Otherwise return the label
+        as-is.
+        """
+        if self.prefix_group_id and self.group_id:
+            return f"{self.group_id}.{label}"
+
+        return label
+
+    @property
+    def upstream_join_id(self):
+        """
+        If this TaskGroup has immediate upstream TaskGroups or tasks, a dummy node called
+        upstream_join_id will be created in Graph View to join the outgoing edges from this
+        TaskGroup to reduce the total number of edges needed to be displayed.
+        """
+        return f"{self.group_id}.upstream_join_id"
+
+    @property
+    def downstream_join_id(self):
+        """
+        If this TaskGroup has immediate downstream TaskGroups or tasks, a dummy node called
+        downstream_join_id will be created in Graph View to join the outgoing edges from this
+        TaskGroup to reduce the total number of edges needed to be displayed.
+        """
+        return f"{self.group_id}.downstream_join_id"
+
+    def get_task_group_dict(self) -> Dict[str, "TaskGroup"]:
+        """
+        Returns a flat dictionary of group_id: TaskGroup
+        """
+        task_group_map = {}
+
+        def build_map(task_group):
+            if not isinstance(task_group, TaskGroup):
+                return
+
+            task_group_map[task_group.group_id] = task_group
+
+            for child in task_group.children.values():
+                build_map(child)
+
+        build_map(self)
+        return task_group_map
+
+    def get_child_by_label(self, label):

Review comment:
       Would you mind adding type hints?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] KevinYang21 commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
KevinYang21 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-680713393


   Do you mind elaborate a bit more on the impact to TreeView please? Would it be rendered just like the subdag operators? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-674536558


   @houqp  thanks for your review. Here are the changes since your last review.
   @kaxil I made some changes to `serialized_objects.py` to handle serialization/deserialization of `dag.task_group`. Could you take a look?
   
   - Added serialization of `TaskGroup`. `dag.task_group` is serialized as a `dict`.
   - Improved UI interaction:
      - The graph is moved to focus around the most recently expanded/collapsed node.
      - If text is typed into the searchbox, the TaskGroup nodes with matching children tasks are also highlighted
      - TaskGroup now has a tooltip showing a summary count of its children TaskInstance states.
   - Added typing hints to task_group.py.
   - Added tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-708299298


   > Its a nice feature any idea when it'll be available(GA release)?
   
   in Airflow 2.0 -- by the end of this year


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] casassg commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
casassg commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r480247681



##########
File path: airflow/models/baseoperator.py
##########
@@ -382,7 +389,16 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self.label = task_id
+
+        # Prefix task_id with group_id
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        if task_group:
+            self.task_id = f"{task_group.group_id}.{self.label}" if task_group.group_id else self.label

Review comment:
       > Do not prefix task_id with group_id
   
   This is a downgrade from current SubDagOperator. You can have duplicated task_id across subdags in the same dag at the moment. I would suggest actually prefixing.
   
   Also this change is a divergence with what's voted and proposed in https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-34+TaskGroup%3A+A+UI+task+grouping+concept+as+an+alternative+to+SubDagOperator. I would either bring this up for discussion on the mailing list and wait for a lazy consensus, or refactor AIP and re-submit for vote. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-689473660


   > I'm not sure if this is a bug or a `XComArg` feature. Since it's not a `TaskGroup` issue, the change (if any) should probably be done separately in a different PR.
   
   I would say it's a bug that we should definitely fix:
   https://github.com/apache/airflow/pull/10827
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-688778222


   > I love this change! This DAG representation is nice and clean. I'm just wondering if this will work with AIP-31 approach (`@task` decorator). Especially
   > 
   > ```python
   > @task
   > def my_task():
   >     ...
   > 
   > with TaskGroup(...) as tg:
   >     ...
   > 
   > r = my_task()
   > r >> tg
   > ```
   
   Hi, @turbaszek  thanks for the detailed review. I have updated the PR accordingly.
   
   Regarding the question about AIP-31, I just verified it works like a charm. The indentation of your call to `my_task()` needs to be updated to be inside the TaskGroup contextmanager. I.e. something like this:
   
   ```python
   @task
   def my_task():
      ...
    
   with TaskGroup(...) as tg:
      ...
      r = my_task()
   
   r >> tg
   ```
   
   Looking at the implementation of the `@task` decorator, it's no surprise this works fine because it's calling this line under the hood. As long as this is called within the context of a TaskGroup, it works fine. i think passing `task_group` as an argument to the decorator would also work although the code would not look as elegant.
   ```
   ...
               op = _PythonFunctionalOperator(python_callable=f, op_args=args, op_kwargs=f_kwargs,
                                              multiple_outputs=multiple_outputs, **kwargs)
   
   ```
   
   Here's a fully working example slightly modified from `example_task_group`. Notice how the `hello_world` task appeared in two groups. The underlying `task_id` are different as shown in the Tree View. 
   
   ```python
   from airflow.models.dag import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.utils.dates import days_ago
   from airflow.utils.task_group import TaskGroup
   from airflow.operators.python import task, PythonOperator
   
   @task
   def hello_world(value):
       print(value)
   
   # [START howto_task_group]
   with DAG(dag_id="example_task_group2", start_date=days_ago(2)) as dag:
       start = DummyOperator(task_id="start")
   
       # [START howto_task_group_section_1]
       with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1:
           task_1 = DummyOperator(task_id="task_1")
           task_2 = DummyOperator(task_id="task_2")
           task_3 = PythonOperator(task_id="task_3", python_callable=lambda: "task_3 output")
           hello_world(task_3.output)
   
           task_1 >> [task_2, task_3]
       # [END howto_task_group_section_1]
   
       # [START howto_task_group_section_2]
       with TaskGroup("section_2", tooltip="Tasks for section_2") as section_2:
           task_1 = DummyOperator(task_id="task_1")
   
           # [START howto_task_group_inner_section_2]
           with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
               task_2 = DummyOperator(task_id="task_2")
               task_3 = PythonOperator(task_id="task_3", python_callable=lambda: "task_3 output")
               task_hello = hello_world(task_3.output)
   
           # [END howto_task_group_inner_section_2]
   
       # [END howto_task_group_section_2]
   
       end = DummyOperator(task_id='end')
   
       start >> section_1 >> section_2 >> end
   # [END howto_task_group]
   ```
   
   ![image](https://user-images.githubusercontent.com/6637585/92465281-f5718200-f200-11ea-8473-af0f691cc0de.png)
   
   ![image](https://user-images.githubusercontent.com/6637585/92465577-58631900-f201-11ea-8e70-436506de99cf.png)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691054773






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 closed pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #10153:
URL: https://github.com/apache/airflow/pull/10153


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484264186



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,392 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+A TaskGroup is a collection of closely related tasks on the same DAG that should be grouped
+together when the DAG is displayed graphically.
+"""
+
+from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException, DuplicateTaskIdFound
+
+if TYPE_CHECKING:
+    from airflow.models.baseoperator import BaseOperator
+    from airflow.models.dag import DAG
+
+
+class TaskGroup:
+    """
+    A collection of tasks. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+
+    :param group_id: a unique, meaningful id for the TaskGroup. group_id must not conflict
+        with group_id of TaskGroup or task_id of tasks in the DAG. Root TaskGroup has group_id
+        set to None.
+    :type group_id: str
+    :param prefix_group_id: If set to True, child task_id and group_id will be prefixed with
+        this TaskGroup's group_id. If set to False, child task_id and group_id are not prefixed.
+        Default is True.
+    :type prerfix_group_id: bool
+    :param parent_group: The parent TaskGroup of this TaskGroup. parent_group is set to None
+        for the root TaskGroup.
+    :type parent_group: TaskGroup
+    :param dag: The DAG that this TaskGroup belongs to.
+    :type dag: airflow.models.DAG
+    :param tooltip: The tooltip of the TaskGroup node when displayed in the UI
+    :type tooltip: str
+    :param ui_color: The fill color of the TaskGroup node when displayed in the UI
+    :type ui_color: str
+    :param ui_fgcolor: The label color of the TaskGroup node when displayed in the UI
+    :type ui_fgcolor: str
+    """
+
+    def __init__(
+        self,
+        group_id: Optional[str],
+        prefix_group_id: bool = True,
+        parent_group: Optional["TaskGroup"] = None,
+        dag: Optional["DAG"] = None,
+        tooltip: str = "",
+        ui_color: str = "CornflowerBlue",
+        ui_fgcolor: str = "#000",
+    ):
+        from airflow.models.dag import DagContext
+
+        self.prefix_group_id = prefix_group_id
+
+        if group_id is None:
+            # This creates a root TaskGroup.
+            if parent_group:
+                raise AirflowException("Root TaskGroup cannot have parent_group")
+            # used_group_ids is shared across all TaskGroups in the same DAG to keep track
+            # of used group_id to avoid duplication.
+            self.used_group_ids: Set[Optional[str]] = set()
+            self._parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+
+            dag = dag or DagContext.get_current_dag()
+
+            if not parent_group and not dag:
+                raise AirflowException("TaskGroup can only be used inside a dag")
+
+            self._parent_group = parent_group or TaskGroupContext.get_current_task_group(dag)
+            if not self._parent_group:
+                raise AirflowException("TaskGroup must have a parent_group except for the root TaskGroup")
+            self.used_group_ids = self._parent_group.used_group_ids
+
+        self._group_id = group_id
+        if self.group_id in self.used_group_ids:
+            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        self.used_group_ids.add(self.group_id)
+        self.used_group_ids.add(self.downstream_join_id)
+        self.used_group_ids.add(self.upstream_join_id)
+        self.children: Dict[str, Union["BaseOperator", "TaskGroup"]] = {}
+        if self._parent_group:
+            self._parent_group.add(self)
+
+        self.tooltip = tooltip
+        self.ui_color = ui_color
+        self.ui_fgcolor = ui_fgcolor
+
+        # Keep track of TaskGroups or tasks that depend on this entire TaskGroup separately
+        # so that we can optimize the number of edges when entire TaskGroups depend on each other.
+        self.upstream_group_ids: Set[Optional[str]] = set()
+        self.downstream_group_ids: Set[Optional[str]] = set()
+        self.upstream_task_ids: Set[Optional[str]] = set()
+        self.downstream_task_ids: Set[Optional[str]] = set()
+
+    @classmethod
+    def create_root(cls, dag: "DAG"):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None, dag=dag)
+
+    @property
+    def is_root(self):
+        """
+        Returns True if this TaskGroup is the root TaskGroup. Otherwise False
+        """
+        return not self.group_id
+
+    def __iter__(self):
+        for child in self.children.values():
+            if isinstance(child, TaskGroup):
+                for inner_task in child:
+                    yield inner_task
+            else:
+                yield child
+
+    def add(self, task: Union["BaseOperator", "TaskGroup"]) -> None:
+        """
+        Add a task to this TaskGroup.
+        """
+        key = task.group_id if isinstance(task, TaskGroup) else task.task_id
+
+        if key in self.children:
+            raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
+
+        if isinstance(task, TaskGroup):
+            if task.children:
+                raise AirflowException("Cannot add a non-empty TaskGroup")
+
+        self.children[key] = task  # type: ignore
+
+    @property
+    def group_id(self) -> Optional[str]:
+        """
+        group_id of this TaskGroup.
+        """
+        if self._parent_group and self._parent_group.prefix_group_id and self._parent_group.group_id:
+            return self._parent_group.child_id(self._group_id)
+
+        return self._group_id
+
+    @property
+    def label(self):
+        """
+        group_id excluding parent's group_id used as the node label in UI.
+        """
+        return self._group_id
+
+    def _set_relative(
+            self,
+            task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"],
+            upstream: bool = False
+    ) -> None:
+        """
+        Call set_upstream/set_downstream for all root/leaf tasks within this TaskGroup.
+        Update upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids.
+        """
+        from airflow.models.baseoperator import BaseOperator
+
+        if upstream:
+            for task in self.get_roots():
+                task.set_upstream(task_or_task_list)
+        else:
+            for task in self.get_leaves():
+                task.set_downstream(task_or_task_list)
+
+        # Update upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids
+        # accordingly so that we can reduce the number of edges when displaying Graph View.
+        if isinstance(task_or_task_list, TaskGroup):
+            # Handles TaskGroup and TaskGroup
+            if upstream:
+                parent, child = (self, task_or_task_list)
+            else:
+                parent, child = (task_or_task_list, self)
+
+            parent.upstream_group_ids.add(child.group_id)
+            child.downstream_group_ids.add(parent.group_id)
+        else:
+            # Handles TaskGroup and task or list of tasks
+            try:
+                task_list = list(task_or_task_list)  # type: ignore
+            except TypeError:
+                task_list = [task_or_task_list]  # type: ignore
+
+            for task in task_list:
+                if not isinstance(task, BaseOperator):
+                    raise AirflowException("Relationships can only be set between TaskGroup or operators; "
+                                           f"received {task.__class__.__name__}")
+
+                if upstream:
+                    self.upstream_task_ids.add(task.task_id)
+                else:
+                    self.downstream_task_ids.add(task.task_id)
+
+    def set_downstream(
+        self, task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"]
+    ) -> None:
+        """
+        Set a TaskGroup/task/list of task downstream of this TaskGroup.
+        """
+        self._set_relative(task_or_task_list, upstream=False)
+
+    def set_upstream(
+        self, task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"]
+    ) -> None:
+        """
+        Set a TaskGroup/task/list of task upstream of this TaskGroup.
+        """
+        self._set_relative(task_or_task_list, upstream=True)
+
+    def __enter__(self):
+        TaskGroupContext.push_context_managed_task_group(self)
+        return self
+
+    def __exit__(self, _type, _value, _tb):
+        TaskGroupContext.pop_context_managed_task_group()
+
+    def has_task(self, task: "BaseOperator") -> bool:
+        """
+        Returns True if this TaskGroup or its children TaskGroups contains the given task.
+        """
+        if task.task_id in self.children:
+            return True
+
+        return any(child.has_task(task) for child in self.children.values() if isinstance(child, TaskGroup))
+
+    def get_roots(self) -> Generator["BaseOperator", None, None]:
+        """
+        Returns a generator of tasks that are root tasks, i.e. those with no upstream
+        dependencies within the TaskGroup.
+        """
+        for task in self:
+            if not any(self.has_task(parent) for parent in task.get_direct_relatives(upstream=True)):
+                yield task
+
+    def get_leaves(self) -> Generator["BaseOperator", None, None]:
+        """
+        Returns a generator of tasks that are leaf tasks, i.e. those with no downstream
+        dependencies within the TaskGroup
+        """
+        for task in self:
+            if not any(self.has_task(child) for child in task.get_direct_relatives(upstream=False)):
+                yield task
+
+    def __rshift__(self, other):
+        """
+        Implements Self >> Other == self.set_downstream(other)
+        """
+        self.set_downstream(other)
+        return other
+
+    def __lshift__(self, other):
+        """
+        Implements Self << Other == self.set_upstream(other)
+        """
+        self.set_upstream(other)
+        return other
+
+    def __rrshift__(self, other):
+        """
+        Called for Operator >> [Operator] because list don't have
+        __rshift__ operators.
+        """
+        self.__lshift__(other)
+        return self
+
+    def __rlshift__(self, other):
+        """
+        Called for Operator << [Operator] because list don't have
+        __lshift__ operators.
+        """
+        self.__rshift__(other)
+        return self
+
+    def child_id(self, label):
+        """
+        Prefix label with group_id if prefix_group_id is True. Otherwise return the label
+        as-is.
+        """
+        if self.prefix_group_id and self.group_id:
+            return f"{self.group_id}.{label}"
+
+        return label
+
+    @property
+    def upstream_join_id(self):

Review comment:
       ```suggestion
       def upstream_join_id(self) -> str:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-688622957


   > I love this change! This DAG representation is nice and clean. I'm just wondering if this will work with AIP-31 approach 
   
   This would be awesome. 
   
   Actually I started a discussion on https://towardsdatascience.com/managing-dependencies-between-data-pipelines-in-apache-airflow-prefect-f4eba65886df article of Anna (and there is a thread where we discuss it with the author) about some new ways of thinking about building DAGs when we have Task Group concept merged. 
   
   I think this makes it possible to start thinking a bit differently about building complex DAGs and reusing parts of the logic between different DAGS (which Task Grop actually makes very easy to visualize) - it's been possible before but Task Group makes it actually a "first class citizen" to build reusable "DAG fragments" that you will be able to re-use between different DAGs. When paired with DAG trigger/DAG wait operator/sensor I think this is a complete solution that allows to implement arbitrary complex 'dependencies" between pipelines that Anna wrote about in a an elegant way, without introducing unnecessary complexity on the processing side.
   
   Having functional interface to do that would make it even better and would allow DAG developers to rethink their way of developing DAGs.
   
   It sounds very exciting to be able to join AIP-31 and AIP-34 results into a coherent and nice approach.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484263778



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,392 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+A TaskGroup is a collection of closely related tasks on the same DAG that should be grouped
+together when the DAG is displayed graphically.
+"""
+
+from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException, DuplicateTaskIdFound
+
+if TYPE_CHECKING:
+    from airflow.models.baseoperator import BaseOperator
+    from airflow.models.dag import DAG
+
+
+class TaskGroup:
+    """
+    A collection of tasks. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+
+    :param group_id: a unique, meaningful id for the TaskGroup. group_id must not conflict
+        with group_id of TaskGroup or task_id of tasks in the DAG. Root TaskGroup has group_id
+        set to None.
+    :type group_id: str
+    :param prefix_group_id: If set to True, child task_id and group_id will be prefixed with
+        this TaskGroup's group_id. If set to False, child task_id and group_id are not prefixed.
+        Default is True.
+    :type prerfix_group_id: bool
+    :param parent_group: The parent TaskGroup of this TaskGroup. parent_group is set to None
+        for the root TaskGroup.
+    :type parent_group: TaskGroup
+    :param dag: The DAG that this TaskGroup belongs to.
+    :type dag: airflow.models.DAG
+    :param tooltip: The tooltip of the TaskGroup node when displayed in the UI
+    :type tooltip: str
+    :param ui_color: The fill color of the TaskGroup node when displayed in the UI
+    :type ui_color: str
+    :param ui_fgcolor: The label color of the TaskGroup node when displayed in the UI
+    :type ui_fgcolor: str
+    """
+
+    def __init__(
+        self,
+        group_id: Optional[str],
+        prefix_group_id: bool = True,
+        parent_group: Optional["TaskGroup"] = None,
+        dag: Optional["DAG"] = None,
+        tooltip: str = "",
+        ui_color: str = "CornflowerBlue",
+        ui_fgcolor: str = "#000",
+    ):
+        from airflow.models.dag import DagContext
+
+        self.prefix_group_id = prefix_group_id
+
+        if group_id is None:
+            # This creates a root TaskGroup.
+            if parent_group:
+                raise AirflowException("Root TaskGroup cannot have parent_group")
+            # used_group_ids is shared across all TaskGroups in the same DAG to keep track
+            # of used group_id to avoid duplication.
+            self.used_group_ids: Set[Optional[str]] = set()
+            self._parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+
+            dag = dag or DagContext.get_current_dag()
+
+            if not parent_group and not dag:
+                raise AirflowException("TaskGroup can only be used inside a dag")
+
+            self._parent_group = parent_group or TaskGroupContext.get_current_task_group(dag)
+            if not self._parent_group:
+                raise AirflowException("TaskGroup must have a parent_group except for the root TaskGroup")
+            self.used_group_ids = self._parent_group.used_group_ids
+
+        self._group_id = group_id
+        if self.group_id in self.used_group_ids:
+            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        self.used_group_ids.add(self.group_id)
+        self.used_group_ids.add(self.downstream_join_id)
+        self.used_group_ids.add(self.upstream_join_id)
+        self.children: Dict[str, Union["BaseOperator", "TaskGroup"]] = {}
+        if self._parent_group:
+            self._parent_group.add(self)
+
+        self.tooltip = tooltip
+        self.ui_color = ui_color
+        self.ui_fgcolor = ui_fgcolor
+
+        # Keep track of TaskGroups or tasks that depend on this entire TaskGroup separately
+        # so that we can optimize the number of edges when entire TaskGroups depend on each other.
+        self.upstream_group_ids: Set[Optional[str]] = set()
+        self.downstream_group_ids: Set[Optional[str]] = set()
+        self.upstream_task_ids: Set[Optional[str]] = set()
+        self.downstream_task_ids: Set[Optional[str]] = set()
+
+    @classmethod
+    def create_root(cls, dag: "DAG"):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None, dag=dag)
+
+    @property
+    def is_root(self):
+        """
+        Returns True if this TaskGroup is the root TaskGroup. Otherwise False
+        """
+        return not self.group_id
+
+    def __iter__(self):
+        for child in self.children.values():
+            if isinstance(child, TaskGroup):
+                for inner_task in child:
+                    yield inner_task
+            else:
+                yield child
+
+    def add(self, task: Union["BaseOperator", "TaskGroup"]) -> None:
+        """
+        Add a task to this TaskGroup.
+        """
+        key = task.group_id if isinstance(task, TaskGroup) else task.task_id
+
+        if key in self.children:
+            raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
+
+        if isinstance(task, TaskGroup):
+            if task.children:
+                raise AirflowException("Cannot add a non-empty TaskGroup")
+
+        self.children[key] = task  # type: ignore
+
+    @property
+    def group_id(self) -> Optional[str]:
+        """
+        group_id of this TaskGroup.
+        """
+        if self._parent_group and self._parent_group.prefix_group_id and self._parent_group.group_id:
+            return self._parent_group.child_id(self._group_id)
+
+        return self._group_id
+
+    @property
+    def label(self):

Review comment:
       ```suggestion
       def label(self) -> str:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-688834630


   > > I love this change! This DAG representation is nice and clean. I'm just wondering if this will work with AIP-31 approach
   > 
   > This would be awesome.
   > 
   > Actually I started a discussion on https://towardsdatascience.com/managing-dependencies-between-data-pipelines-in-apache-airflow-prefect-f4eba65886df article of Anna (and there is a thread where we discuss it with the author) about some new ways of thinking about building DAGs when we have Task Group concept merged.
   > 
   > I think this makes it possible to start thinking a bit differently about building complex DAGs and reusing parts of the logic between different DAGS (which Task Grop actually makes very easy to visualize) - it's been possible before but Task Group makes it actually a "first class citizen" to build reusable "DAG fragments" that you will be able to re-use between different DAGs. When paired with DAG trigger/DAG wait operator/sensor I think this is a complete solution that allows to implement arbitrary complex 'dependencies" between pipelines that Anna wrote about in a an elegant way, without introducing unnecessary complexity on the processing side.
   > 
   > Having functional interface to do that would make it even better and would allow DAG developers to rethink their way of developing DAGs.
   > 
   > It sounds very exciting to be able to join AIP-31 and AIP-34 results into a coherent and nice approach.
   
   Hi @potiuk  thanks for the comment and the interesting article link. I totally agree that with `TaskGroup` in mind, we will have a new way of building arbitrarily complex interdependent workflows. In the past we would be trying hard to put these individual workflows onto separate DAGs, and link them up using `ExternalTaskSensor`, `ExternalTaskMarker`, `SubDagOperator`, `TriggerDagRunOperator`, etc With `TaskGroup`, we have one more option: just put the related workflows all in the same DAG and link them up normally like simple tasks. It's also possible to write functions that returns a `TaskGroup` object which can be re-used in different DAGs to re-use the same workflow.
   
   That said, I'm not sure if we should really combine the two AIPs. To go along with the idea, I included an example in the previous reply [here](https://github.com/apache/airflow/pull/10153#issuecomment-688778222) showing the `@task` decorator and `TaskGroup` used together.  They seem to work nicely with each other just fine even though they were born out of two different AIPs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-686359741


   Hi, @houqp and @casassg , as we discussed in [this comment](https://github.com/apache/airflow/pull/10153#discussion_r479914159) and this email ["[AIP-34] Rewrite SubDagOperator"](https://lists.apache.org/thread.html/r2e62854d1fdcee2627dcfb724c90a7215d2b325c5736905f62c53332%40%3Cdev.airflow.apache.org%3E), I added a flag `prefix_group_id` to `TaskGroup`. It defaults to `True`. If we don't want the `task_id` to be prefixed by its `TaskGroup`, we can pass `prefix_group_id=False` when creating the `TaskGroup`. That's it. This makes `TaskGroup` easy to use for most cases, and also allows preserving the original `task_id` if the user really wants to.
   
   Please take another look at the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r472502513



##########
File path: airflow/models/baseoperator.py
##########
@@ -382,7 +389,16 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self.label = task_id
+
+        # Prefix task_id with group_id
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        if task_group:
+            self.task_id = f"{task_group.group_id}.{self.label}" if task_group.group_id else self.label

Review comment:
       it looks like we are still changing the semantic of task_id here. would it make more sense to use `self.label` to store ` f"{task_group.group_id}.{self.label}"` and leave `task_id` as is?

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -626,3 +648,59 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
         if ver != cls.SERIALIZER_VERSION:
             raise ValueError("Unsure how to deserialize version {!r}".format(ver))
         return cls.deserialize_dag(serialized_obj['dag'])
+
+
+class SerializedTaskGroup(TaskGroup, BaseSerialization):
+    """
+    A JSON serializable representation of TaskGroup.
+    """
+    @classmethod
+    def serialize_task_group(cls, task_group: TaskGroup) -> Optional[Union[Dict[str, Any]]]:
+        """Serializes TaskGroup into a JSON object.
+        """
+        if not task_group:
+            return None
+
+        serialize_group = {}
+        serialize_group["_group_id"] = task_group._group_id  # pylint: disable=protected-access
+
+        serialize_group['children'] = {  # type: ignore
+            label: (DAT.OP, child.task_id)
+            if isinstance(child, BaseOperator) else
+            (DAT.TASK_GROUP, SerializedTaskGroup.serialize_task_group(child))
+            for label, child in task_group.children.items()
+        }
+        serialize_group['tooltip'] = task_group.tooltip
+        serialize_group['ui_color'] = task_group.ui_color
+        serialize_group['ui_fgcolor'] = task_group.ui_fgcolor

Review comment:
       nitpick, these can be simplified/optimized to:
   
   ```suggestion
           serialize_group = {
               "_group_id": task_group._group_id,  # pylint: disable=protected-access
               "tooltip": task_group.tooltip,
               "ui_color": task_group.ui_color,
               "ui_fgcolor": task_group.ui_fgcolor,
               "children": {  # type: ignore
                   label: (DAT.OP, child.task_id)
                   if isinstance(child, BaseOperator) else
                   (DAT.TASK_GROUP, SerializedTaskGroup.serialize_task_group(child))
                   for label, child in task_group.children.items()
               },
           }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691026924






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r480899496



##########
File path: airflow/models/baseoperator.py
##########
@@ -382,7 +389,16 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self.label = task_id
+
+        # Prefix task_id with group_id
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        if task_group:
+            self.task_id = f"{task_group.group_id}.{self.label}" if task_group.group_id else self.label

Review comment:
       Hi @casassg . Thanks for the suggestion. I brought this up on the email "[AIP-34] Rewrite SubDagOperator". Let's see what most people say.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484798133



##########
File path: airflow/example_dags/example_task_group.py
##########
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the TaskGroup."""
+
+from airflow.models.dag import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.dates import days_ago
+from airflow.utils.task_group import TaskGroup
+
+
+# [START howto_task_group]
+def create_section():
+    """
+    Create tasks in the outer section.
+    """
+    dummies = [DummyOperator(task_id=f'task-{i + 1}') for i in range(5)]
+
+    with TaskGroup("inside_section_1") as inside_section_1:
+        _ = [DummyOperator(task_id=f'task-{i + 1}',) for i in range(3)]
+
+    with TaskGroup("inside_section_2") as inside_section_2:
+        _ = [DummyOperator(task_id=f'task-{i + 1}',) for i in range(3)]
+
+    dummies[-1] >> inside_section_1
+    dummies[-2] >> inside_section_2
+    inside_section_1 >> inside_section_2
+
+
+with DAG(dag_id="example_task_group", start_date=days_ago(2)) as dag:
+    start = DummyOperator(task_id="start")
+
+    with TaskGroup("section_1", tooltip="Tasks for Section 1") as section_1:
+        create_section()
+
+    some_other_task = DummyOperator(task_id="some-other-task")
+
+    with TaskGroup("section_2", tooltip="Tasks for Section 2") as section_2:
+        create_section()
+
+    end = DummyOperator(task_id='end')
+
+    start >> section_1 >> some_other_task >> section_2 >> end
+# [END howto_task_group]

Review comment:
       Thanks. I'm simplifying the example and adding `START/END`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-690924584






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r472097920



##########
File path: airflow/models/baseoperator.py
##########
@@ -380,7 +387,15 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self._task_id = task_id

Review comment:
       You are right. i'm reverting this back to `self.task_id`.
   
   That said, I don't think we should allow people to change `task_id` of a task once it's set. That causes the task_dict to have a wrong key and will certainly cause bugs. So maybe it's better to have `task_id` as a readonly propertly. That is out of the scope of this PR. So i'll leave things in the original state for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484265151



##########
File path: airflow/utils/task_group.py
##########
@@ -0,0 +1,392 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+A TaskGroup is a collection of closely related tasks on the same DAG that should be grouped
+together when the DAG is displayed graphically.
+"""
+
+from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Sequence, Set, Union
+
+from airflow.exceptions import AirflowException, DuplicateTaskIdFound
+
+if TYPE_CHECKING:
+    from airflow.models.baseoperator import BaseOperator
+    from airflow.models.dag import DAG
+
+
+class TaskGroup:
+    """
+    A collection of tasks. When set_downstream() or set_upstream() are called on the
+    TaskGroup, it is applied across all tasks within the group if necessary.
+
+    :param group_id: a unique, meaningful id for the TaskGroup. group_id must not conflict
+        with group_id of TaskGroup or task_id of tasks in the DAG. Root TaskGroup has group_id
+        set to None.
+    :type group_id: str
+    :param prefix_group_id: If set to True, child task_id and group_id will be prefixed with
+        this TaskGroup's group_id. If set to False, child task_id and group_id are not prefixed.
+        Default is True.
+    :type prerfix_group_id: bool
+    :param parent_group: The parent TaskGroup of this TaskGroup. parent_group is set to None
+        for the root TaskGroup.
+    :type parent_group: TaskGroup
+    :param dag: The DAG that this TaskGroup belongs to.
+    :type dag: airflow.models.DAG
+    :param tooltip: The tooltip of the TaskGroup node when displayed in the UI
+    :type tooltip: str
+    :param ui_color: The fill color of the TaskGroup node when displayed in the UI
+    :type ui_color: str
+    :param ui_fgcolor: The label color of the TaskGroup node when displayed in the UI
+    :type ui_fgcolor: str
+    """
+
+    def __init__(
+        self,
+        group_id: Optional[str],
+        prefix_group_id: bool = True,
+        parent_group: Optional["TaskGroup"] = None,
+        dag: Optional["DAG"] = None,
+        tooltip: str = "",
+        ui_color: str = "CornflowerBlue",
+        ui_fgcolor: str = "#000",
+    ):
+        from airflow.models.dag import DagContext
+
+        self.prefix_group_id = prefix_group_id
+
+        if group_id is None:
+            # This creates a root TaskGroup.
+            if parent_group:
+                raise AirflowException("Root TaskGroup cannot have parent_group")
+            # used_group_ids is shared across all TaskGroups in the same DAG to keep track
+            # of used group_id to avoid duplication.
+            self.used_group_ids: Set[Optional[str]] = set()
+            self._parent_group = None
+        else:
+            if not isinstance(group_id, str):
+                raise ValueError("group_id must be str")
+            if not group_id:
+                raise ValueError("group_id must not be empty")
+
+            dag = dag or DagContext.get_current_dag()
+
+            if not parent_group and not dag:
+                raise AirflowException("TaskGroup can only be used inside a dag")
+
+            self._parent_group = parent_group or TaskGroupContext.get_current_task_group(dag)
+            if not self._parent_group:
+                raise AirflowException("TaskGroup must have a parent_group except for the root TaskGroup")
+            self.used_group_ids = self._parent_group.used_group_ids
+
+        self._group_id = group_id
+        if self.group_id in self.used_group_ids:
+            raise DuplicateTaskIdFound(f"group_id '{self.group_id}' has already been added to the DAG")
+        self.used_group_ids.add(self.group_id)
+        self.used_group_ids.add(self.downstream_join_id)
+        self.used_group_ids.add(self.upstream_join_id)
+        self.children: Dict[str, Union["BaseOperator", "TaskGroup"]] = {}
+        if self._parent_group:
+            self._parent_group.add(self)
+
+        self.tooltip = tooltip
+        self.ui_color = ui_color
+        self.ui_fgcolor = ui_fgcolor
+
+        # Keep track of TaskGroups or tasks that depend on this entire TaskGroup separately
+        # so that we can optimize the number of edges when entire TaskGroups depend on each other.
+        self.upstream_group_ids: Set[Optional[str]] = set()
+        self.downstream_group_ids: Set[Optional[str]] = set()
+        self.upstream_task_ids: Set[Optional[str]] = set()
+        self.downstream_task_ids: Set[Optional[str]] = set()
+
+    @classmethod
+    def create_root(cls, dag: "DAG"):
+        """
+        Create a root TaskGroup with no group_id or parent.
+        """
+        return cls(group_id=None, dag=dag)
+
+    @property
+    def is_root(self):
+        """
+        Returns True if this TaskGroup is the root TaskGroup. Otherwise False
+        """
+        return not self.group_id
+
+    def __iter__(self):
+        for child in self.children.values():
+            if isinstance(child, TaskGroup):
+                for inner_task in child:
+                    yield inner_task
+            else:
+                yield child
+
+    def add(self, task: Union["BaseOperator", "TaskGroup"]) -> None:
+        """
+        Add a task to this TaskGroup.
+        """
+        key = task.group_id if isinstance(task, TaskGroup) else task.task_id
+
+        if key in self.children:
+            raise DuplicateTaskIdFound(f"Task id '{key}' has already been added to the DAG")
+
+        if isinstance(task, TaskGroup):
+            if task.children:
+                raise AirflowException("Cannot add a non-empty TaskGroup")
+
+        self.children[key] = task  # type: ignore
+
+    @property
+    def group_id(self) -> Optional[str]:
+        """
+        group_id of this TaskGroup.
+        """
+        if self._parent_group and self._parent_group.prefix_group_id and self._parent_group.group_id:
+            return self._parent_group.child_id(self._group_id)
+
+        return self._group_id
+
+    @property
+    def label(self):
+        """
+        group_id excluding parent's group_id used as the node label in UI.
+        """
+        return self._group_id
+
+    def _set_relative(
+            self,
+            task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"],
+            upstream: bool = False
+    ) -> None:
+        """
+        Call set_upstream/set_downstream for all root/leaf tasks within this TaskGroup.
+        Update upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids.
+        """
+        from airflow.models.baseoperator import BaseOperator
+
+        if upstream:
+            for task in self.get_roots():
+                task.set_upstream(task_or_task_list)
+        else:
+            for task in self.get_leaves():
+                task.set_downstream(task_or_task_list)
+
+        # Update upstream_group_ids/downstream_group_ids/upstream_task_ids/downstream_task_ids
+        # accordingly so that we can reduce the number of edges when displaying Graph View.
+        if isinstance(task_or_task_list, TaskGroup):
+            # Handles TaskGroup and TaskGroup
+            if upstream:
+                parent, child = (self, task_or_task_list)
+            else:
+                parent, child = (task_or_task_list, self)
+
+            parent.upstream_group_ids.add(child.group_id)
+            child.downstream_group_ids.add(parent.group_id)
+        else:
+            # Handles TaskGroup and task or list of tasks
+            try:
+                task_list = list(task_or_task_list)  # type: ignore
+            except TypeError:
+                task_list = [task_or_task_list]  # type: ignore
+
+            for task in task_list:
+                if not isinstance(task, BaseOperator):
+                    raise AirflowException("Relationships can only be set between TaskGroup or operators; "
+                                           f"received {task.__class__.__name__}")
+
+                if upstream:
+                    self.upstream_task_ids.add(task.task_id)
+                else:
+                    self.downstream_task_ids.add(task.task_id)
+
+    def set_downstream(
+        self, task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"]
+    ) -> None:
+        """
+        Set a TaskGroup/task/list of task downstream of this TaskGroup.
+        """
+        self._set_relative(task_or_task_list, upstream=False)
+
+    def set_upstream(
+        self, task_or_task_list: Union['BaseOperator', Sequence['BaseOperator'], "TaskGroup"]
+    ) -> None:
+        """
+        Set a TaskGroup/task/list of task upstream of this TaskGroup.
+        """
+        self._set_relative(task_or_task_list, upstream=True)
+
+    def __enter__(self):
+        TaskGroupContext.push_context_managed_task_group(self)
+        return self
+
+    def __exit__(self, _type, _value, _tb):
+        TaskGroupContext.pop_context_managed_task_group()
+
+    def has_task(self, task: "BaseOperator") -> bool:
+        """
+        Returns True if this TaskGroup or its children TaskGroups contains the given task.
+        """
+        if task.task_id in self.children:
+            return True
+
+        return any(child.has_task(task) for child in self.children.values() if isinstance(child, TaskGroup))
+
+    def get_roots(self) -> Generator["BaseOperator", None, None]:
+        """
+        Returns a generator of tasks that are root tasks, i.e. those with no upstream
+        dependencies within the TaskGroup.
+        """
+        for task in self:
+            if not any(self.has_task(parent) for parent in task.get_direct_relatives(upstream=True)):
+                yield task
+
+    def get_leaves(self) -> Generator["BaseOperator", None, None]:
+        """
+        Returns a generator of tasks that are leaf tasks, i.e. those with no downstream
+        dependencies within the TaskGroup
+        """
+        for task in self:
+            if not any(self.has_task(child) for child in task.get_direct_relatives(upstream=False)):
+                yield task
+
+    def __rshift__(self, other):
+        """
+        Implements Self >> Other == self.set_downstream(other)
+        """
+        self.set_downstream(other)
+        return other
+
+    def __lshift__(self, other):
+        """
+        Implements Self << Other == self.set_upstream(other)
+        """
+        self.set_upstream(other)
+        return other
+
+    def __rrshift__(self, other):
+        """
+        Called for Operator >> [Operator] because list don't have
+        __rshift__ operators.
+        """
+        self.__lshift__(other)
+        return self
+
+    def __rlshift__(self, other):
+        """
+        Called for Operator << [Operator] because list don't have
+        __lshift__ operators.
+        """
+        self.__rshift__(other)
+        return self
+
+    def child_id(self, label):
+        """
+        Prefix label with group_id if prefix_group_id is True. Otherwise return the label
+        as-is.
+        """
+        if self.prefix_group_id and self.group_id:
+            return f"{self.group_id}.{label}"
+
+        return label
+
+    @property
+    def upstream_join_id(self):
+        """
+        If this TaskGroup has immediate upstream TaskGroups or tasks, a dummy node called
+        upstream_join_id will be created in Graph View to join the outgoing edges from this
+        TaskGroup to reduce the total number of edges needed to be displayed.
+        """
+        return f"{self.group_id}.upstream_join_id"
+
+    @property
+    def downstream_join_id(self):
+        """
+        If this TaskGroup has immediate downstream TaskGroups or tasks, a dummy node called
+        downstream_join_id will be created in Graph View to join the outgoing edges from this
+        TaskGroup to reduce the total number of edges needed to be displayed.
+        """
+        return f"{self.group_id}.downstream_join_id"
+
+    def get_task_group_dict(self) -> Dict[str, "TaskGroup"]:
+        """
+        Returns a flat dictionary of group_id: TaskGroup
+        """
+        task_group_map = {}
+
+        def build_map(task_group):
+            if not isinstance(task_group, TaskGroup):
+                return
+
+            task_group_map[task_group.group_id] = task_group
+
+            for child in task_group.children.values():
+                build_map(child)
+
+        build_map(self)
+        return task_group_map
+
+    def get_child_by_label(self, label):
+        """
+        Get a child task/TaskGroup by its label (i.e. task_id/group_id without the group_id prefix)
+        """
+        return self.children[self.child_id(label)]
+
+
+class TaskGroupContext:
+    """
+    TaskGroup context is used to keep the current TaskGroup when TaskGroup is used as ContextManager.
+    """
+
+    _context_managed_task_group: Optional[TaskGroup] = None
+    _previous_context_managed_task_groups: List[TaskGroup] = []
+
+    @classmethod
+    def push_context_managed_task_group(cls, task_group: TaskGroup):
+        """
+        Push a TaskGroup into the list of managed TaskGroups.
+        """
+        if cls._context_managed_task_group:
+            cls._previous_context_managed_task_groups.append(cls._context_managed_task_group)
+        cls._context_managed_task_group = task_group
+
+    @classmethod
+    def pop_context_managed_task_group(cls) -> Optional[TaskGroup]:
+        """
+        Pops the last TaskGroup from the list of manged TaskGroups and update the current TaskGroup.
+        """
+        old_task_group = cls._context_managed_task_group
+        if cls._previous_context_managed_task_groups:
+            cls._context_managed_task_group = cls._previous_context_managed_task_groups.pop()
+        else:
+            cls._context_managed_task_group = None
+        return old_task_group
+
+    @classmethod
+    def get_current_task_group(cls, dag) -> Optional[TaskGroup]:
+        """
+        Get the current TaskGroup.
+        """
+        from airflow.models.dag import DagContext
+
+        if not cls._context_managed_task_group:
+            dag = dag or DagContext.get_current_dag()

Review comment:
       ```suggestion
       def get_current_task_group(cls, dag: Optional["DAG"]) -> Optional[TaskGroup]:
           """
           Get the current TaskGroup.
           """
           from airflow.models.dag import DagContext
   
           if not cls._context_managed_task_group:
               dag = dag or DagContext.get_current_dag()
   ```
   Should we make the DAG explicitly optional due to the logic from L387?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r478394967



##########
File path: airflow/models/baseoperator.py
##########
@@ -382,7 +389,16 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self.label = task_id
+
+        # Prefix task_id with group_id
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        if task_group:
+            self.task_id = f"{task_group.group_id}.{self.label}" if task_group.group_id else self.label

Review comment:
       Hi @houqp, even without TaskGroup, users cannot define tasks with conflicting names in the same DAG. So if we simply remove the notion of `label` in this PR, we can achieve that. That means the label in Graph View will be the same as `task_id`. And users have to do their own work to maintain unique `task_id` across the entire DAG, and across TaskGroups.
   
   The same example will look like this. And the `task_id` kept in `dag.task_dict` and on the Graph View are like this:
   ```
   section_1_task_1
   section_1_task_2
   section_2_task_1
   section_2_task_2
   ```
   
   ```
   def create_section(group_id):
       task1 = DummyOperator(task_id=f"{group_id}_task_1")
       task2 = DummyOperator(task_id=f"{group_id}_task_2")
   
   with DAG(...) as dag:
       with TaskGroup("section1") as section1:
           create_section(section1.group_id)
   
       with TaskGroup("section2") as section2:
           create_section(section2.group_id)
   ```
   
   How does it sound?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691026924


   Fix for static check failure has been merged, can you rebase once more please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r472553598



##########
File path: airflow/models/baseoperator.py
##########
@@ -382,7 +389,16 @@ def __init__(
                 stacklevel=3
             )
         validate_key(task_id)
-        self.task_id = task_id
+        self.label = task_id
+
+        # Prefix task_id with group_id
+        task_group = task_group or TaskGroupContext.get_current_task_group(dag)
+        if task_group:
+            self.task_id = f"{task_group.group_id}.{self.label}" if task_group.group_id else self.label

Review comment:
       Hi @houqp  `self.task_id` is only prefixed with `group_id` if the DAG starts to use TaskGroup. For existing dags not using TaskGroup, `self.task_id` remains equal to the passed in `task_id` and are unchanged.
    
   The reason I'm prefixing `self.task_id` with `group_id` is to avoid duplicated task_id in `dag.task_dict`. E.g, if we write code like this:
   ```
   
   def create_section():
       task1 = DummyOperator(task_id="task1")
       task2 = DummyOperator(task_id="task2")
   
   with DAG(...) as dag:
       with TaskGroup("section1") as section1:
           create_section()
   
       with TaskGroup("section2") as section2:
           create_section()
   ```
   
   The actual task_id stored in dag.task_dict will be this. This is to avoid duplication.
   ```
   section1.task1
   section1.task2
   section2.task1
   section2.task2
   ```
   The label users need to see in Graph View is this. I.e. they don't need to see the fully-qualified task_id 
   in the graph because the task is nested within the group.
   
   ```
   section1
       task1
       task2
   section2
       task1
       task2
   ```
   
   
   Alternatively, if we want to avoid changing the semantic of `self.task_id` here when using TaskGroup. We can ask the user to pass in `label` separately and let the user modify the task_id themselves to avoid duplication.
   I find this introduces extra burden to the user. However, maybe there are advantages to this too? What do you think? 
   
   ```
   def create_section(group_id):
       task1 = DummyOperator(task_id=f"{group_id}.task1", label="task1")
       task2 = DummyOperator(task_id="{group_id}.task2", label="task2")
   
   with DAG(...) as dag:
       with TaskGroup("section1") as section1:
           create_section(section1.group_id)
   
       with TaskGroup("section2") as section2:
           create_section(section2.group_id)
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r489137031



##########
File path: tests/utils/test_task_group.py
##########
@@ -0,0 +1,529 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+import pytest
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.task_group import TaskGroup
+from airflow.www.views import dag_edges, task_group_to_dict
+
+EXPECTED_JSON = {
+    'id': None,
+    'value': {
+        'label': None,
+        'labelStyle': 'fill:#000;',
+        'style': 'fill:CornflowerBlue',
+        'rx': 5,
+        'ry': 5,
+        'clusterLabelPos': 'top',
+    },
+    'tooltip': '',
+    'children': [
+        {
+            'id': 'group234',
+            'value': {
+                'label': 'group234',
+                'labelStyle': 'fill:#000;',
+                'style': 'fill:CornflowerBlue',
+                'rx': 5,
+                'ry': 5,
+                'clusterLabelPos': 'top',
+            },
+            'tooltip': '',
+            'children': [
+                {
+                    'id': 'group234.group34',
+                    'value': {
+                        'label': 'group34',
+                        'labelStyle': 'fill:#000;',
+                        'style': 'fill:CornflowerBlue',
+                        'rx': 5,
+                        'ry': 5,
+                        'clusterLabelPos': 'top',
+                    },
+                    'tooltip': '',
+                    'children': [
+                        {
+                            'id': 'group234.group34.task3',
+                            'value': {
+                                'label': 'task3',
+                                'labelStyle': 'fill:#000;',
+                                'style': 'fill:#e8f7e4;',
+                                'rx': 5,
+                                'ry': 5,
+                            },
+                        },
+                        {
+                            'id': 'group234.group34.task4',
+                            'value': {
+                                'label': 'task4',
+                                'labelStyle': 'fill:#000;',
+                                'style': 'fill:#e8f7e4;',
+                                'rx': 5,
+                                'ry': 5,
+                            },
+                        },
+                        {
+                            'id': 'group234.group34.downstream_join_id',
+                            'value': {
+                                'label': '',
+                                'labelStyle': 'fill:#000;',
+                                'style': 'fill:CornflowerBlue;',
+                                'shape': 'circle',
+                            },
+                        },
+                    ],
+                },
+                {
+                    'id': 'group234.task2',
+                    'value': {
+                        'label': 'task2',
+                        'labelStyle': 'fill:#000;',
+                        'style': 'fill:#e8f7e4;',
+                        'rx': 5,
+                        'ry': 5,
+                    },
+                },
+                {
+                    'id': 'group234.upstream_join_id',
+                    'value': {
+                        'label': '',
+                        'labelStyle': 'fill:#000;',
+                        'style': 'fill:CornflowerBlue;',
+                        'shape': 'circle',
+                    },
+                },
+            ],
+        },
+        {
+            'id': 'task1',
+            'value': {
+                'label': 'task1',
+                'labelStyle': 'fill:#000;',
+                'style': 'fill:#e8f7e4;',
+                'rx': 5,
+                'ry': 5,
+            },
+        },
+        {
+            'id': 'task5',
+            'value': {
+                'label': 'task5',
+                'labelStyle': 'fill:#000;',
+                'style': 'fill:#e8f7e4;',
+                'rx': 5,
+                'ry': 5,
+            },
+        },
+    ],
+}
+
+
+def test_build_task_group_context_manager():
+    execution_date = pendulum.parse("20200101")
+    with DAG("test_build_task_group_context_manager", start_date=execution_date) as dag:
+        task1 = DummyOperator(task_id="task1")
+        with TaskGroup("group234") as group234:
+            _ = DummyOperator(task_id="task2")
+
+            with TaskGroup("group34") as group34:
+                _ = DummyOperator(task_id="task3")
+                _ = DummyOperator(task_id="task4")
+
+        task5 = DummyOperator(task_id="task5")
+        task1 >> group234
+        group34 >> task5
+
+    assert task1.get_direct_relative_ids(upstream=False) == {
+        'group234.group34.task4',
+        'group234.group34.task3',
+        'group234.task2',
+    }
+    assert task5.get_direct_relative_ids(upstream=True) == {
+        'group234.group34.task4',
+        'group234.group34.task3',
+    }
+
+    assert dag.task_group.group_id is None
+    assert dag.task_group.is_root
+    assert set(dag.task_group.children.keys()) == {"task1", "group234", "task5"}
+    assert group34.group_id == "group234.group34"
+
+    assert task_group_to_dict(dag.task_group) == EXPECTED_JSON
+
+
+def test_build_task_group():
+    """
+    This is an alternative syntax to use TaskGroup. It should result in the same TaskGroup
+    as using context manager.
+    """
+    execution_date = pendulum.parse("20200101")
+    dag = DAG("test_build_task_group", start_date=execution_date)
+    task1 = DummyOperator(task_id="task1", dag=dag)
+    group234 = TaskGroup("group234", dag=dag)
+    _ = DummyOperator(task_id="task2", dag=dag, task_group=group234)
+    group34 = TaskGroup("group34", dag=dag, parent_group=group234)
+    _ = DummyOperator(task_id="task3", dag=dag, task_group=group34)
+    _ = DummyOperator(task_id="task4", dag=dag, task_group=group34)
+    task5 = DummyOperator(task_id="task5", dag=dag)
+
+    task1 >> group234
+    group34 >> task5
+
+    assert task_group_to_dict(dag.task_group) == EXPECTED_JSON
+
+
+def extract_node_id(node, include_label=False):
+    ret = {"id": node["id"]}
+    if include_label:
+        ret["label"] = node["value"]["label"]
+    if "children" in node:
+        children = []
+        for child in node["children"]:
+            children.append(extract_node_id(child, include_label=include_label))
+
+        ret["children"] = children
+
+    return ret
+
+
+def test_build_task_group_with_prefix():
+    """
+    Tests that prefix_group_id turns on/off prefixing of task_id with group_id.
+    """
+    execution_date = pendulum.parse("20200101")
+    with DAG("test_build_task_group_with_prefix", start_date=execution_date) as dag:
+        task1 = DummyOperator(task_id="task1")
+        with TaskGroup("group234", prefix_group_id=False) as group234:
+            task2 = DummyOperator(task_id="task2")
+
+            with TaskGroup("group34") as group34:
+                task3 = DummyOperator(task_id="task3")
+
+                with TaskGroup("group4", prefix_group_id=False) as group4:
+                    task4 = DummyOperator(task_id="task4")
+
+        task5 = DummyOperator(task_id="task5")
+        task1 >> group234
+        group34 >> task5
+
+    assert task2.task_id == "task2"
+    assert group34.group_id == "group34"
+    assert task3.task_id == "group34.task3"
+    assert group4.group_id == "group34.group4"
+    assert task4.task_id == "task4"
+    assert task5.task_id == "task5"
+    assert group234.get_child_by_label("task2") == task2
+    assert group234.get_child_by_label("group34") == group34
+    assert group4.get_child_by_label("task4") == task4
+
+    assert extract_node_id(task_group_to_dict(dag.task_group), include_label=True) == {
+        'id': None,
+        'label': None,
+        'children': [
+            {
+                'id': 'group234',
+                'label': 'group234',
+                'children': [
+                    {
+                        'id': 'group34',
+                        'label': 'group34',
+                        'children': [
+                            {
+                                'id': 'group34.group4',
+                                'label': 'group4',
+                                'children': [{'id': 'task4', 'label': 'task4'}],
+                            },
+                            {'id': 'group34.task3', 'label': 'task3'},
+                            {'id': 'group34.downstream_join_id', 'label': ''},
+                        ],
+                    },
+                    {'id': 'task2', 'label': 'task2'},
+                    {'id': 'group234.upstream_join_id', 'label': ''},
+                ],
+            },
+            {'id': 'task1', 'label': 'task1'},
+            {'id': 'task5', 'label': 'task5'},
+        ],
+    }
+
+
+def test_build_task_group_with_task_decorator():
+    """
+    Test that TaskGroup can be used with the @task decorator.
+    """
+    from airflow.operators.python import task
+
+    @task
+    def task_1():
+        print("task_1")
+
+    @task
+    def task_2():
+        return "task_2"
+
+    @task
+    def task_3():
+        return "task_3"
+
+    @task
+    def task_4(task_2_output, task_3_output):
+        print(task_2_output, task_3_output)
+
+    @task
+    def task_5():
+        print("task_5")
+
+    execution_date = pendulum.parse("20200101")
+    with DAG("test_build_task_group_with_task_decorator", start_date=execution_date) as dag:
+        tsk_1 = task_1()
+
+        with TaskGroup("group234") as group234:
+            tsk_2 = task_2()
+            tsk_3 = task_3()
+            tsk_4 = task_4(tsk_2, tsk_3)
+
+        tsk_5 = task_5()
+
+        tsk_1 >> group234 >> tsk_5

Review comment:
       @turbaszek thanks for #10827. Now its merged, i've updated this test to this more natural syntax.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 closed pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #10153:
URL: https://github.com/apache/airflow/pull/10153






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-689473660


   > I'm not sure if this is a bug or a `XComArg` feature. Since it's not a `TaskGroup` issue, the change (if any) should probably be done separately in a different PR.
   
   I would say it's a bug that we should definitely fix! I will take a look 👌 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-690924584


   > > Great work @yuqian90 👏
   > 
   > Thanks @turbaszek .
   > 
   > I looked into the most recent check failures after I rebased upstream, I don't think they are related to my change. I'll see if others report the same issue and wait for a fix upstream.
   > 
   > ```
   > airflow/models/dagrun.py:72: error: unused 'type: ignore' comment
   >             primaryjoin=and_(TI.dag_id == dag_id, TI.execution_date == exe...
   > ```
   
   Indeed it seems to be unrelated. @mik-laj any ideas?
   
   Btw. there's also more severe problem:
   ```
   tests/utils/perf/dags/perf_dag_2.py:1:0: R0401: Cyclic import (airflow.models.baseoperator -> airflow.models.xcom_arg -> airflow.utils.task_group) (cyclic-import)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-686493127


   > For tree view, can we not hide the task group and when clicking on the node it expands (I think we can do so now too right? If we click on a node it collapses)
   
   Hi, @kaxil yes you are right we can potentially organize Tree View by TaskGroup too. The challenge there is that Tree View flattens the graph into a tree. Therefore each node appears more than once. I have some problem imagining how the collapsible groups should look like in the tree. 
   
   That said, I'm less concerned about Tree View because it's already a collapsible tree. The tree is organised by dependency instead of by TaskGroup. How about we incrementally build on this in a few PRs ? First introduce TaskGroup and make it working in Graph View. And then have people agree on how the Tree View should look like with TaskGroup in mind, and then we put up a PR to incorporate TaskGroup into Tree View?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-683611508


   Hi, @houqp  and reviewers, here are the latest changes since you last reviewed:
   
   - No more prefixing of `task_id` per the discussion [here](https://github.com/apache/airflow/pull/10153#discussion_r479914159). The user is responsible for maintaining unique `task_id` across the entire DAG, just like before.
   - Per the discussion in the email "[AIP-34] Rewrite SubDagOperator" with @casassg, if group2 depends on group1, instead of drawing an edge between every task in group1 and every task in group2, the two functions `task_group_to_dict()` and `dag_edges()` now produce a dummy node for the TaskGroup and join all the edges onto that node. This greatly cuts down the number of edges and speeds up UI rendering many times. This is a UI-only change. Internally, all tasks in group2 still depend on all tasks in group1. The dummy nodes only appear on the UI. They don't exist internally.
   
   This is what `example_task_group.py` now looks like. The circles between `section_1.1` and `section_1.2` are the dummy nodes I'm talking about. They only appear on the graph and not as actual tasks anywhere.
   
   ![image](https://user-images.githubusercontent.com/6637585/91693296-64b1fb00-eb9d-11ea-80dd-02445604aa85.png)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-695336597


   > Is anyone interested in using TaskGroup in the next 1.10.* release? We can contribute our own cherry-picked commit too for v1-10-test since we are going to use it anyway ourselves.
   
   I think this would be great, so that people could test this before.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] Alternative proposal: Add TaskGroup as a DAG construction helper and UI concept with example

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-672891521


   > We should think about how will this be supported in the new rest api so one can easily rebuild the same feature in a new UI.
   
   Hi @houqp  Thanks for the suggestions. I think you bring up a good point. Currently `TaskGroup` is constructed from scratch by the web server based on the `task_group_ids` fields of tasks. This is probably not a very extensible way to do this. To make this more friendly for rest api, i think we need to serialize the `TaskGroup` and pass it along with the DAG object. Serializing the TaskGroup also allows adding some additional attributes to `TaskGroup`, such as `ui_color`, `doc`, etc which can be useful when shown on the web UI. What do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r484262568



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -625,3 +647,69 @@ def from_dict(cls, serialized_obj: dict) -> 'SerializedDAG':
         if ver != cls.SERIALIZER_VERSION:
             raise ValueError("Unsure how to deserialize version {!r}".format(ver))
         return cls.deserialize_dag(serialized_obj['dag'])
+
+
+class SerializedTaskGroup(TaskGroup, BaseSerialization):
+    """
+    A JSON serializable representation of TaskGroup.
+    """
+    @classmethod
+    def serialize_task_group(cls, task_group: TaskGroup) -> Optional[Union[Dict[str, Any]]]:
+        """Serializes TaskGroup into a JSON object.

Review comment:
       ```suggestion
           """
           Serializes TaskGroup into a JSON object.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-691054773


   Yeah. This is unfortunate, but I think not for the "TYPE_CHECKING" problem but because of internal architecture of our "core" which is overcomplicated. Unfortunately, we still have those cyclic dependencies (not cyclic imports really - the name in Pylint is a bit misleading) we have in the project. 
   
   IMHO,  while "technically" those cyclic imports are not triggered in our service (because of the sequence in which classes are usually imported when airflow starts), they make the whole "core" part of airflow much more complex than it should be and difficult to reason about (and difficult to static check).  The single-responsibility principle is not followed still in quite a few classes and it's actually quite difficult to disentangle those. I tried several times and others too and so far it causes a chain reaction of changes that makes it rather difficult to accomplish.
   
   I am going to make some more attempts looking at that (hopefully before 2.0 or while preparing some of the release candidates).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-686895896


   Updated the title and summary to make it clearer the PR is ready for review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-690919898






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#discussion_r485543331



##########
File path: tests/utils/test_task_group.py
##########
@@ -0,0 +1,523 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+import pytest
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.task_group import TaskGroup
+from airflow.www.views import dag_edges, task_group_to_dict
+
+EXPECTED_JSON = {
+    'id': None,
+    'value': {
+        'label': None,
+        'labelStyle': 'fill:#000;',
+        'style': 'fill:CornflowerBlue',
+        'rx': 5,
+        'ry': 5,
+        'clusterLabelPos': 'top',
+    },
+    'tooltip': '',
+    'children': [
+        {
+            'id': 'group234',
+            'value': {
+                'label': 'group234',
+                'labelStyle': 'fill:#000;',
+                'style': 'fill:CornflowerBlue',
+                'rx': 5,
+                'ry': 5,
+                'clusterLabelPos': 'top',
+            },
+            'tooltip': '',
+            'children': [
+                {
+                    'id': 'group234.group34',
+                    'value': {
+                        'label': 'group34',
+                        'labelStyle': 'fill:#000;',
+                        'style': 'fill:CornflowerBlue',
+                        'rx': 5,
+                        'ry': 5,
+                        'clusterLabelPos': 'top',
+                    },
+                    'tooltip': '',
+                    'children': [
+                        {
+                            'id': 'group234.group34.task3',
+                            'value': {
+                                'label': 'task3',
+                                'labelStyle': 'fill:#000;',
+                                'style': 'fill:#e8f7e4;',
+                                'rx': 5,
+                                'ry': 5,
+                            },
+                        },
+                        {
+                            'id': 'group234.group34.task4',
+                            'value': {
+                                'label': 'task4',
+                                'labelStyle': 'fill:#000;',
+                                'style': 'fill:#e8f7e4;',
+                                'rx': 5,
+                                'ry': 5,
+                            },
+                        },
+                        {
+                            'id': 'group234.group34.downstream_join_id',
+                            'value': {
+                                'label': '',
+                                'labelStyle': 'fill:#000;',
+                                'style': 'fill:CornflowerBlue;',
+                                'shape': 'circle',
+                            },
+                        },
+                    ],
+                },
+                {
+                    'id': 'group234.task2',
+                    'value': {
+                        'label': 'task2',
+                        'labelStyle': 'fill:#000;',
+                        'style': 'fill:#e8f7e4;',
+                        'rx': 5,
+                        'ry': 5,
+                    },
+                },
+                {
+                    'id': 'group234.upstream_join_id',
+                    'value': {
+                        'label': '',
+                        'labelStyle': 'fill:#000;',
+                        'style': 'fill:CornflowerBlue;',
+                        'shape': 'circle',
+                    },
+                },
+            ],
+        },
+        {
+            'id': 'task1',
+            'value': {
+                'label': 'task1',
+                'labelStyle': 'fill:#000;',
+                'style': 'fill:#e8f7e4;',
+                'rx': 5,
+                'ry': 5,
+            },
+        },
+        {
+            'id': 'task5',
+            'value': {
+                'label': 'task5',
+                'labelStyle': 'fill:#000;',
+                'style': 'fill:#e8f7e4;',
+                'rx': 5,
+                'ry': 5,
+            },
+        },
+    ],
+}
+
+
+def test_build_task_group_context_manager():
+    execution_date = pendulum.parse("20200101")
+    with DAG("test_build_task_group_context_manager", start_date=execution_date) as dag:
+        task1 = DummyOperator(task_id="task1")
+        with TaskGroup("group234") as group234:
+            _ = DummyOperator(task_id="task2")
+
+            with TaskGroup("group34") as group34:
+                _ = DummyOperator(task_id="task3")
+                _ = DummyOperator(task_id="task4")
+
+        task5 = DummyOperator(task_id="task5")
+        task1 >> group234
+        group34 >> task5
+
+    assert task1.get_direct_relative_ids(upstream=False) == {
+        'group234.group34.task4',
+        'group234.group34.task3',
+        'group234.task2',
+    }
+    assert task5.get_direct_relative_ids(upstream=True) == {
+        'group234.group34.task4',
+        'group234.group34.task3',
+    }
+
+    assert dag.task_group.group_id is None
+    assert dag.task_group.is_root
+    assert set(dag.task_group.children.keys()) == {"task1", "group234", "task5"}
+    assert group34.group_id == "group234.group34"
+
+    assert task_group_to_dict(dag.task_group) == EXPECTED_JSON
+
+
+def test_build_task_group():
+    """
+    This is an alternative syntax to use TaskGroup. It should result in the same TaskGroup
+    as using context manager.
+    """
+    execution_date = pendulum.parse("20200101")
+    dag = DAG("test_build_task_group", start_date=execution_date)
+    task1 = DummyOperator(task_id="task1", dag=dag)
+    group234 = TaskGroup("group234", dag=dag)
+    _ = DummyOperator(task_id="task2", dag=dag, task_group=group234)
+    group34 = TaskGroup("group34", dag=dag, parent_group=group234)
+    _ = DummyOperator(task_id="task3", dag=dag, task_group=group34)
+    _ = DummyOperator(task_id="task4", dag=dag, task_group=group34)
+    task5 = DummyOperator(task_id="task5", dag=dag)
+
+    task1 >> group234
+    group34 >> task5
+
+    assert task_group_to_dict(dag.task_group) == EXPECTED_JSON
+
+
+def extract_node_id(node, include_label=False):
+    ret = {"id": node["id"]}
+    if include_label:
+        ret["label"] = node["value"]["label"]
+    if "children" in node:
+        children = []
+        for child in node["children"]:
+            children.append(extract_node_id(child, include_label=include_label))
+
+        ret["children"] = children
+
+    return ret
+
+
+def test_build_task_group_with_prefix():
+    """
+    Tests that prefix_group_id turns on/off prefixing of task_id with group_id.
+    """
+    execution_date = pendulum.parse("20200101")
+    with DAG("test_build_task_group_with_prefix", start_date=execution_date) as dag:
+        task1 = DummyOperator(task_id="task1")
+        with TaskGroup("group234", prefix_group_id=False) as group234:
+            task2 = DummyOperator(task_id="task2")
+
+            with TaskGroup("group34") as group34:
+                task3 = DummyOperator(task_id="task3")
+
+                with TaskGroup("group4", prefix_group_id=False) as group4:
+                    task4 = DummyOperator(task_id="task4")
+
+        task5 = DummyOperator(task_id="task5")
+        task1 >> group234 >> task5
+
+    assert task2.task_id == "task2"
+    assert group34.group_id == "group34"
+    assert task3.task_id == "group34.task3"
+    assert group4.group_id == "group34.group4"
+    assert task4.task_id == "task4"
+    assert task5.task_id == "task5"
+    assert group234.get_child_by_label("task2") == task2
+    assert group234.get_child_by_label("group34") == group34
+    assert group4.get_child_by_label("task4") == task4
+
+    assert extract_node_id(task_group_to_dict(dag.task_group), include_label=True) == {
+        'id': None,
+        'label': None,
+        'children': [
+            {
+                'id': 'group234',
+                'label': 'group234',
+                'children': [
+                    {
+                        'id': 'group34',
+                        'label': 'group34',
+                        'children': [
+                            {
+                                'id': 'group34.group4',
+                                'label': 'group4',
+                                'children': [{'id': 'task4', 'label': 'task4'}],
+                            },
+                            {'id': 'group34.task3', 'label': 'task3'},
+                            {'id': 'group34.downstream_join_id', 'label': ''},
+                        ],
+                    },
+                    {'id': 'task2', 'label': 'task2'},
+                    {'id': 'group234.upstream_join_id', 'label': ''},
+                ],
+            },
+            {'id': 'task1', 'label': 'task1'},
+            {'id': 'task5', 'label': 'task5'},
+        ],
+    }
+
+
+def test_build_task_group_with_task_decorator():
+    """
+    Test that TaskGroup can be used with the @task decorator.
+    """
+    from airflow.operators.python import task
+
+    @task
+    def task_1():
+        print("task_1")
+
+    @task
+    def task_2():
+        return "task_2"
+
+    @task
+    def task_3():
+        return "task_3"
+
+    @task
+    def task_4(task_2_output, task_3_output):
+        print(task_2_output, task_3_output)
+
+    @task
+    def task_5():
+        print("task_5")
+
+    execution_date = pendulum.parse("20200101")
+    with DAG("test_build_task_group_with_task_decorator", start_date=execution_date) as dag:
+        tsk_1 = task_1()
+
+        with TaskGroup("group234") as group234:
+            tsk_2 = task_2()
+            tsk_3 = task_3()
+            _ = task_4(tsk_2, tsk_3)
+
+        tsk_5 = task_5()
+
+        tsk_1 >> group234
+        tsk_5 << group234

Review comment:
       Sure that's done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 edited a comment on pull request #10153: [AIP-34] TaskGroup: A UI task grouping concept as an alternative to SubDagOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 edited a comment on pull request #10153:
URL: https://github.com/apache/airflow/pull/10153#issuecomment-689468941


   > The lack of indentation was on purpose to check if `TaskGroup` works well with `>>` between `XComArg` which is returned by invoking `my_task()`:
   > https://github.com/apache/airflow/blob/aaf56f9816ed72e18a3215183c185d379b4e4247/airflow/operators/python.py#L291
   > 
   > At least typing here is missing `TaskGroup`:
   > https://github.com/apache/airflow/blob/aaf56f9816ed72e18a3215183c185d379b4e4247/airflow/models/xcom_arg.py#L118-L130
   
   
   Hi @turbaszek, thanks for pointing out. In order to demonstrate `TaskGroup` working with `@task` and making sure it continues to work, I added `test_build_task_group_with_task_decorator` in `test_task_group.py`. Please see if that's in line with what you have in mind.
   
   That said, I noticed two small issues that's not related to `TaskGroup`, but rather to `XComArg`:
   
   1. This function inside `xcom_args.py` returns `self` instead of `other` (which is what `BaseOperator` does).
   
   ```
       def __rshift__(self, other):
           """
           Implements XComArg >> op
           """
           self.set_downstream(other)
           return self
   ```
   
   2. `BaseOperator` `__lshift__` and `__rshift__` do not handle `XComArg`.
   
   As a result, it effectively prevents us from chaining operators like this:
   ```
   tsk_1 >> group234 >> tsk_5
   ```
   
   I.e. we would intuitively expect the above line to put `tsk_5` downstream of `group234`. But because `tsk_1` is a `XComArg`, what this line actually does is very surprising. It puts both `group234` and `tsk_5` downstream of `tsk_1`. 
   So we are forced to write this, which is what I added in the test.
   ```
   tsk_1 >> group234
   tsk_5 << group234
   ```
   
   I'm not sure if this is a bug or a `XComArg` feature. Since it's not a `TaskGroup` issue, the change (if any) should probably be done separately in a different PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org