You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/03 04:58:56 UTC

[GitHub] [airflow] xinbinhuang opened a new pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

xinbinhuang opened a new pull request #14640:
URL: https://github.com/apache/airflow/pull/14640


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   closes: #14563
   
   This PR enables ExternalTaskSensor to also wait for the external task_group. 
   
   The implementation is to retrieve the external DAG from the DagBag and then check if the TaskGroup exists. If so, query and wait for the states of all tasks within that TaskGroup during the poking cycle.
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r589854984



##########
File path: airflow/models/dag.py
##########
@@ -745,10 +745,18 @@ def tasks(self, val):
     def task_ids(self) -> List[str]:
         return list(self.task_dict.keys())
 
+    @property
+    def task_group_dict(self) -> Dict[str, "TaskGroup"]:
+        return {k: v for k, v in self._task_group.get_task_group_dict().items() if k is not None}

Review comment:
       Since `.task_group` does the same
   
   ```suggestion
           return {k: v for k, v in self.task_group.get_task_group_dict().items() if k is not None}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r603840023



##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +232,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)
+        else:
+            count = self._count_query(DR, session, states, dttm_filter).scalar()
+
         return count
 
+    def _count_query(self, model, session, states, dttm_filter) -> "Query":
+        query = session.query(func.count()).filter(  # .count() is inefficient
+            model.dag_id == self.external_dag_id,
+            model.state.in_(states),  # pylint: disable=no-member
+            model.execution_date.in_(dttm_filter),
+        )
+
+        return query
+
+    def get_external_task_group_task_ids(self, session):
+        """Return task ids for the external TaskGroup"""
+        refreshed_dag_info = DagBag(read_dags_from_db=True).get_dag(self.external_dag_id, session)
+        task_group: Optional["TaskGroup"] = refreshed_dag_info.task_group_dict.get(
+            self.external_task_group_id
+        )
+        if not task_group:
+            raise AirflowException(
+                f"The external task group {self.external_task_group_id} in "
+                f"DAG {self.external_dag_id} does not exist."
+            )
+        task_ids = [task.task_id for task in task_group]
+        return task_ids
+

Review comment:
       The main piece that you retrieve a list of tasks for a TaskGroup.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588860807



##########
File path: airflow/sensors/external_task.py
##########
@@ -134,20 +146,23 @@ def __init__(
         self.execution_delta = execution_delta
         self.execution_date_fn = execution_date_fn
         self.external_dag_id = external_dag_id
+        self.external_task_group_id = external_task_group_id
         self.external_task_id = external_task_id
         self.check_existence = check_existence

Review comment:
       `self.check_existence = check_existence` is `False` by default, which maybe make sense for external_dag or external_task. But external_task_group has to check and get an existing dag in order to get the list of task_ids. 
   
   I wonder if we can change the default to True or even have `check_existence` enabled required? This can give more useful errors if the external task/dag does not exist as well as having a consistent behavior as external_task_group. Also, I can't think of the reason to have a Sensor waiting for an object that doesn't exist until it times out




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-812939170


   Test fails on K8S image build job, and I think it's not relevant to this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #14640:
URL: https://github.com/apache/airflow/pull/14640


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r608782468



##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +184,23 @@ def poke(self, context, session=None):
         if self.failed_states:
             count_failed = self.get_count(dttm_filter, session, self.failed_states)
 
-        if count_failed == len(dttm_filter):
+        if count_failed > 0:

Review comment:
       I don't think it's necessary more than an entry in the `UPDATING.md`. I think the only situation where you will have multiple counts is when the `execution_date_fn` returns more than one execution date to wait for. However, the original behavior will get you into a weird state when only part of the TIs fail, i.e. one fail and one succeeds, resulting in time out. IMHO, I think this's more like a bug than intended behavior. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588860807



##########
File path: airflow/sensors/external_task.py
##########
@@ -134,20 +146,23 @@ def __init__(
         self.execution_delta = execution_delta
         self.execution_date_fn = execution_date_fn
         self.external_dag_id = external_dag_id
+        self.external_task_group_id = external_task_group_id
         self.external_task_id = external_task_id
         self.check_existence = check_existence

Review comment:
       `self.check_existence = check_existence` is `False` by default, which maybe make sense for external_dag or external_task. But external_task_group has to check and get an existing dag in order to get the list of task_ids. 
   
   I wonder if we can change the default to True or even have `check_existence` enabled required? This can give more useful errors if the external task/dag does not exist. Also, I can't think of the reason to have a Sensor waiting for an object that doesn't exist until it times out




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #14640:
URL: https://github.com/apache/airflow/pull/14640


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #14640:
URL: https://github.com/apache/airflow/pull/14640


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-811459419


   [The Workflow run](https://github.com/apache/airflow/actions/runs/706274159) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #14640:
URL: https://github.com/apache/airflow/pull/14640


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r603840023



##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +232,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)
+        else:
+            count = self._count_query(DR, session, states, dttm_filter).scalar()
+
         return count
 
+    def _count_query(self, model, session, states, dttm_filter) -> "Query":
+        query = session.query(func.count()).filter(  # .count() is inefficient
+            model.dag_id == self.external_dag_id,
+            model.state.in_(states),  # pylint: disable=no-member
+            model.execution_date.in_(dttm_filter),
+        )
+
+        return query
+
+    def get_external_task_group_task_ids(self, session):
+        """Return task ids for the external TaskGroup"""
+        refreshed_dag_info = DagBag(read_dags_from_db=True).get_dag(self.external_dag_id, session)
+        task_group: Optional["TaskGroup"] = refreshed_dag_info.task_group_dict.get(
+            self.external_task_group_id
+        )
+        if not task_group:
+            raise AirflowException(
+                f"The external task group {self.external_task_group_id} in "
+                f"DAG {self.external_dag_id} does not exist."
+            )
+        task_ids = [task.task_id for task in task_group]
+        return task_ids
+

Review comment:
       The main piece that you retrieve a list of tasks for a TaskGroup. I believe that `read_dags_from_db=True` is safe to use because serialized dag is enabled by default in 2.0.0, am I correct? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588860807



##########
File path: airflow/sensors/external_task.py
##########
@@ -134,20 +146,23 @@ def __init__(
         self.execution_delta = execution_delta
         self.execution_date_fn = execution_date_fn
         self.external_dag_id = external_dag_id
+        self.external_task_group_id = external_task_group_id
         self.external_task_id = external_task_id
         self.check_existence = check_existence

Review comment:
       `self.check_existence = check_existence` is `False` by default, which maybe make sense for external_dag or external_task. But external_task_group has to check and get an existing dag in order to get the list of task_ids. 
   
   I wonder if we can change the default to True or even have `check_existence` enabled required? This can give more useful errors when the external task/dag does not exist. Also, I can't think of the reason to have a Sensor waiting for an object that doesn't exist until it times out




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-811459103


   [The Workflow run](https://github.com/apache/airflow/actions/runs/706272731) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-895632270


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-791913797


   [The Workflow run](https://github.com/apache/airflow/actions/runs/627104621) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r608038176



##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +184,23 @@ def poke(self, context, session=None):
         if self.failed_states:
             count_failed = self.get_count(dttm_filter, session, self.failed_states)
 
-        if count_failed == len(dttm_filter):
+        if count_failed > 0:

Review comment:
       Should we add this comment in code?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-820511244


   @KevinYang21 @kaxil @yuqian90 @ashb  Gentle reminder. Please take a look and give some feedback if you have time :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r608037670



##########
File path: airflow/models/dag.py
##########
@@ -1525,8 +1533,11 @@ def filter_task_group(group, parent_group):
 
         return dag
 
+    def has_task_group(self, group_id: str):

Review comment:
       ```suggestion
       def has_task_group(self, group_id: str) -> bool:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-980810109


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r603840023



##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +232,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)
+        else:
+            count = self._count_query(DR, session, states, dttm_filter).scalar()
+
         return count
 
+    def _count_query(self, model, session, states, dttm_filter) -> "Query":
+        query = session.query(func.count()).filter(  # .count() is inefficient
+            model.dag_id == self.external_dag_id,
+            model.state.in_(states),  # pylint: disable=no-member
+            model.execution_date.in_(dttm_filter),
+        )
+
+        return query
+
+    def get_external_task_group_task_ids(self, session):
+        """Return task ids for the external TaskGroup"""
+        refreshed_dag_info = DagBag(read_dags_from_db=True).get_dag(self.external_dag_id, session)
+        task_group: Optional["TaskGroup"] = refreshed_dag_info.task_group_dict.get(
+            self.external_task_group_id
+        )
+        if not task_group:
+            raise AirflowException(
+                f"The external task group {self.external_task_group_id} in "
+                f"DAG {self.external_dag_id} does not exist."
+            )
+        task_ids = [task.task_id for task in task_group]
+        return task_ids
+

Review comment:
       The main piece that you retrieve a list of tasks for a TaskGroup. I believe that `read_dags_from_db=True` is safe to use here because serialized dag is enabled by default in 2.0, am I correct? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #14640:
URL: https://github.com/apache/airflow/pull/14640


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r608782468



##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +184,23 @@ def poke(self, context, session=None):
         if self.failed_states:
             count_failed = self.get_count(dttm_filter, session, self.failed_states)
 
-        if count_failed == len(dttm_filter):
+        if count_failed > 0:

Review comment:
       I don't think it's necessary more than an entry in the `UPDATING.md`. The only situation where you will have multiple counts is when the `execution_date_fn` returns more than one execution date to wait for. However, the original behavior will get you into a weird state when only part of the TIs fail, i.e. one fail and one succeeds, resulting in time out. IMHO, I would consider this as a bug




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-812944603


   Just fixed the K8S problem in #15182 - can you please rebase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-810080093


   [The Workflow run](https://github.com/apache/airflow/actions/runs/700950325) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-1028475349


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #14640:
URL: https://github.com/apache/airflow/pull/14640


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r608782468



##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +184,23 @@ def poke(self, context, session=None):
         if self.failed_states:
             count_failed = self.get_count(dttm_filter, session, self.failed_states)
 
-        if count_failed == len(dttm_filter):
+        if count_failed > 0:

Review comment:
       I don't think it's necessary more than an entry in the `UPDATING.md`. I think the only situation where you will have multiple counts is when the `execution_date_fn` returns more than one execution date to wait for. However, the original behavior will get you into a weird state when only part of the TIs fail, i.e. one fail and one succeeds, resulting in time out. IMHO, I think this's more like a bug than intended behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
eladkal commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-997460724


   > Thanks for the nudge! Will try to wrap it up before the holidays hit.
   
   Great :) so re-opening so it won't be missed 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
eladkal commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-997456276


   @xinbinhuang will you have time to complete it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r603839218



##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +184,23 @@ def poke(self, context, session=None):
         if self.failed_states:
             count_failed = self.get_count(dttm_filter, session, self.failed_states)
 
-        if count_failed == len(dttm_filter):
+        if count_failed > 0:

Review comment:
       Here I am making the assumption that as long as there is at least one external task failure, then we will want to fail the sensor. Though this changes the original behavior, I think this will be a better behavior?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r589825450



##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +232,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)
+        else:
+            count = self._count_query(DR, session, states, dttm_filter).scalar()
+
         return count
 
+    def _count_query(self, model, session, states, dttm_filter) -> "Query":
+        query = session.query(func.count()).filter(  # .count() is inefficient
+            model.dag_id == self.external_dag_id,
+            model.state.in_(states),  # pylint: disable=no-member
+            model.execution_date.in_(dttm_filter),
+        )
+
+        return query
+
+    def get_external_task_group_task_ids(self, session):
+        """Return task ids for the external TaskGroup"""
+        refreshed_dag_info = DagBag().get_dag(self.external_dag_id, session)
+        task_group: Optional["TaskGroup"] = refreshed_dag_info.task_group_dict.get(
+            self.external_task_group_id
+        )
+        if task_group is None:
+            raise AirflowException(
+                f'The external task group {self.external_task_group_id} in '
+                f'DAG {self.external_dag_id} does not exist.'
+            )
+        task_ids = [task.task_id for task in task_group]
+        return task_ids

Review comment:
       The main piece that you retrieve a list of tasks for a `TaskGroup`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588860807



##########
File path: airflow/sensors/external_task.py
##########
@@ -134,20 +146,23 @@ def __init__(
         self.execution_delta = execution_delta
         self.execution_date_fn = execution_date_fn
         self.external_dag_id = external_dag_id
+        self.external_task_group_id = external_task_group_id
         self.external_task_id = external_task_id
         self.check_existence = check_existence

Review comment:
       `self.check_existence = check_existence` is `False` by default, which maybe make sense for external_dag or external_task. But external_task_group has to check and get an existing dag in order to get the list of task_ids. 
   
   I wonder if we can change the default to True or even have `check_existence` enabled required? This can give more useful errors if the external task/dag does not exist as well as having a consistent behavior as external_task_group. Also, what would be use case to have a Sensor waiting for an object that doesn't exist until it times out?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-810700444


   [The Workflow run](https://github.com/apache/airflow/actions/runs/703462411) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588860807



##########
File path: airflow/sensors/external_task.py
##########
@@ -134,20 +146,23 @@ def __init__(
         self.execution_delta = execution_delta
         self.execution_date_fn = execution_date_fn
         self.external_dag_id = external_dag_id
+        self.external_task_group_id = external_task_group_id
         self.external_task_id = external_task_id
         self.check_existence = check_existence

Review comment:
       `self.check_existence = check_existence` is `False` by default, which maybe make sense for external_dag or external_task. But external_task_group has to check and get an existing dag in order to get the list of task_ids. 
   
   https://github.com/apache/airflow/blob/fce49402461ee4e7a5f6ffd18cee3121f3496a39/airflow/sensors/external_task.py#L174-L180
   
   I wonder if we can change the default to True or even have `check_existence` enabled required? This can give more useful errors if the external task/dag does not exist as well as having a consistent behavior as external_task_group. Also, what would be use case to have a Sensor waiting for an object that doesn't exist until it times out?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r603840023



##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +232,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)
+        else:
+            count = self._count_query(DR, session, states, dttm_filter).scalar()
+
         return count
 
+    def _count_query(self, model, session, states, dttm_filter) -> "Query":
+        query = session.query(func.count()).filter(  # .count() is inefficient
+            model.dag_id == self.external_dag_id,
+            model.state.in_(states),  # pylint: disable=no-member
+            model.execution_date.in_(dttm_filter),
+        )
+
+        return query
+
+    def get_external_task_group_task_ids(self, session):
+        """Return task ids for the external TaskGroup"""
+        refreshed_dag_info = DagBag(read_dags_from_db=True).get_dag(self.external_dag_id, session)
+        task_group: Optional["TaskGroup"] = refreshed_dag_info.task_group_dict.get(
+            self.external_task_group_id
+        )
+        if not task_group:
+            raise AirflowException(
+                f"The external task group {self.external_task_group_id} in "
+                f"DAG {self.external_dag_id} does not exist."
+            )
+        task_ids = [task.task_id for task in task_group]
+        return task_ids
+

Review comment:
       The main piece that you retrieve a list of tasks for a TaskGroup. I believe that `read_dags_from_db=True` is safe to use here because serialized dag is enabled by default in 2.0.0, am I correct? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588860117



##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +228,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)

Review comment:
       This is to keep the poking check behavior `return count_allowed == len(dttm_filter)`
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-997456622


   > @xinbinhuang will you have time to complete it?
   
   Thanks for the nudge! Will try to wrap it up before the holidays hit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588859705



##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +179,23 @@ def poke(self, context, session=None):
         if self.failed_states:
             count_failed = self.get_count(dttm_filter, session, self.failed_states)
 
-        if count_failed == len(dttm_filter):
+        if 0 < count_failed <= len(dttm_filter):

Review comment:
       Here I am making the assumption that as long as there is at least one external task failure, then we will want to fail the sensor. Though this changes the original behavior, I wonder in what situation users will allow partial failure?

##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +228,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)

Review comment:
       This is trying to converge back to the original behavior, and we don't need this if we fail it whenever one of the task fails.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r614600073



##########
File path: airflow/models/dag.py
##########
@@ -745,10 +745,18 @@ def tasks(self, val):
     def task_ids(self) -> List[str]:
         return list(self.task_dict.keys())
 
+    @property
+    def task_group_dict(self) -> Dict[str, "TaskGroup"]:
+        return {k: v for k, v in self.task_group.get_task_group_dict().items() if k is not None}

Review comment:
       `get_task_group_dict()` is a recursive function that can be costly. I think we should keep it a method instead of making it a property (which tends to suggest to users that it's cheap to access).

##########
File path: airflow/models/dag.py
##########
@@ -1201,7 +1209,7 @@ def clear(
             tis = tis.filter(TI.task_id.in_(self.task_ids))
 
         if include_parentdag and self.is_subdag and self.parent_dag is not None:
-            p_dag = self.parent_dag.sub_dag(
+            p_dag = self.parent_dag.partial_subset(

Review comment:
       Since you are changing this to `partial_subset`, let's use the new feature of this method. You may consider passing it like this to avoid the regex dance: 
   ```
   task_ids_or_regex=[self.dag_id.split('.')[1]]
   ```
   
   This is what the docstr of `partial_subset` says:
   ```
           :param task_ids_or_regex: Either a list of task_ids, or a regex to
               match against task ids (as a string, or compiled regex pattern).
   ```

##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +232,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)
+        else:
+            count = self._count_query(DR, session, states, dttm_filter).scalar()
+
         return count
 
+    def _count_query(self, model, session, states, dttm_filter) -> "Query":
+        query = session.query(func.count()).filter(  # .count() is inefficient
+            model.dag_id == self.external_dag_id,
+            model.state.in_(states),  # pylint: disable=no-member
+            model.execution_date.in_(dttm_filter),
+        )
+
+        return query
+
+    def get_external_task_group_task_ids(self, session):
+        """Return task ids for the external TaskGroup"""
+        refreshed_dag_info = DagBag(read_dags_from_db=True).get_dag(self.external_dag_id, session)
+        task_group: Optional["TaskGroup"] = refreshed_dag_info.task_group_dict.get(
+            self.external_task_group_id
+        )
+        if not task_group:
+            raise AirflowException(
+                f"The external task group {self.external_task_group_id} in "
+                f"DAG {self.external_dag_id} does not exist."
+            )
+        task_ids = [task.task_id for task in task_group]
+        return task_ids
+

Review comment:
       The existing task execution code is creating DagBag on its own instead of reading serialized dags from db. For example this line is creating a DagBag. I think we should do the same here. It's important for tasks to get the latest view of the dag during execution. 
   
   https://github.com/apache/airflow/blob/f1edc220d3f9cb050016d23246a682276bd09eee/airflow/sensors/external_task.py#L213
   

##########
File path: tests/sensors/test_external_task.py
##########
@@ -445,7 +545,7 @@ def clear_tasks(dag_bag, dag, task, start_date=DEFAULT_DATE, end_date=DEFAULT_DA
     """
     Clear the task and its downstream tasks recursively for the dag in the given dagbag.
     """
-    subdag = dag.sub_dag(task_ids_or_regex=f"^{task.task_id}$", include_downstream=True)
+    subdag = dag.partial_subset(task_ids_or_regex=f"^{task.task_id}$", include_downstream=True)

Review comment:
       same here. `task_ids_or_regex` can be a list of task_id

##########
File path: airflow/models/dag.py
##########
@@ -1282,7 +1290,7 @@ def clear(
                             external_dag = dag_bag.get_dag(tii.dag_id)
                             if not external_dag:
                                 raise AirflowException(f"Could not find dag {tii.dag_id}")
-                            downstream = external_dag.sub_dag(
+                            downstream = external_dag.partial_subset(

Review comment:
       Same here. This wants an exact match of the task_id. So passing `task_ids_or_regex` as a list of task_id is better.

##########
File path: airflow/models/dag.py
##########
@@ -745,10 +745,18 @@ def tasks(self, val):
     def task_ids(self) -> List[str]:
         return list(self.task_dict.keys())
 
+    @property
+    def task_group_dict(self) -> Dict[str, "TaskGroup"]:
+        return {k: v for k, v in self.task_group.get_task_group_dict().items() if k is not None}
+
     @property
     def task_group(self) -> "TaskGroup":
         return self._task_group
 
+    @property
+    def task_groups(self) -> List["TaskGroup"]:
+        return list(self.task_group_dict.values())

Review comment:
       Same here for `task_groups`

##########
File path: tests/sensors/test_external_task.py
##########
@@ -24,32 +23,49 @@
 from airflow.exceptions import AirflowException, AirflowSensorTimeout
 from airflow.models import DagBag, TaskInstance
 from airflow.models.dag import DAG
+from airflow.models.serialized_dag import SerializedDagModel
 from airflow.operators.bash import BashOperator
 from airflow.operators.dummy import DummyOperator
 from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
 from airflow.sensors.time_sensor import TimeSensor
 from airflow.serialization.serialized_objects import SerializedBaseOperator
 from airflow.utils.state import State
+from airflow.utils.task_group import TaskGroup
 from airflow.utils.timezone import datetime
+from tests.test_utils.db import clear_db_runs
 
 DEFAULT_DATE = datetime(2015, 1, 1)
 TEST_DAG_ID = 'unit_test_dag'
 TEST_TASK_ID = 'time_sensor_check'
+TEST_TASK_GROUP_ID = 'dummy_task_group'
 DEV_NULL = '/dev/null'
 
 
-class TestExternalTaskSensor(unittest.TestCase):
-    def setUp(self):
-        self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
+class TestExternalTaskSensor:
+    def setup_method(self):
         self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
         self.dag = DAG(TEST_DAG_ID, default_args=self.args)
+        SerializedDagModel.write_dag(self.dag)

Review comment:
       I don't think `SerializedDagModel.write_dag` is needed if the task creates its own dagbag like it previously does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r608782468



##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +184,23 @@ def poke(self, context, session=None):
         if self.failed_states:
             count_failed = self.get_count(dttm_filter, session, self.failed_states)
 
-        if count_failed == len(dttm_filter):
+        if count_failed > 0:

Review comment:
       I don't think it's necessary more than an entry in the `UPDATING.md`. The only situation where you will have multiple counts is when the `execution_date_fn` returns more than one execution date to wait for. However, the original behavior will get you into a weird state when only part of the TIs fail, i.e. one fail and one succeeds, resulting in time out. IMHO, I think this's more like a bug than intended behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r603840023



##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +232,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)
+        else:
+            count = self._count_query(DR, session, states, dttm_filter).scalar()
+
         return count
 
+    def _count_query(self, model, session, states, dttm_filter) -> "Query":
+        query = session.query(func.count()).filter(  # .count() is inefficient
+            model.dag_id == self.external_dag_id,
+            model.state.in_(states),  # pylint: disable=no-member
+            model.execution_date.in_(dttm_filter),
+        )
+
+        return query
+
+    def get_external_task_group_task_ids(self, session):
+        """Return task ids for the external TaskGroup"""
+        refreshed_dag_info = DagBag(read_dags_from_db=True).get_dag(self.external_dag_id, session)
+        task_group: Optional["TaskGroup"] = refreshed_dag_info.task_group_dict.get(
+            self.external_task_group_id
+        )
+        if not task_group:
+            raise AirflowException(
+                f"The external task group {self.external_task_group_id} in "
+                f"DAG {self.external_dag_id} does not exist."
+            )
+        task_ids = [task.task_id for task in task_group]
+        return task_ids
+

Review comment:
       The main piece that you retrieve a list of tasks for a TaskGroup. I believe that `read_dags_from_db=True` is safe to use here because serialized dag is enabled by default in 2.0. @ashb 

##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +232,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)
+        else:
+            count = self._count_query(DR, session, states, dttm_filter).scalar()
+
         return count
 
+    def _count_query(self, model, session, states, dttm_filter) -> "Query":
+        query = session.query(func.count()).filter(  # .count() is inefficient
+            model.dag_id == self.external_dag_id,
+            model.state.in_(states),  # pylint: disable=no-member
+            model.execution_date.in_(dttm_filter),
+        )
+
+        return query
+
+    def get_external_task_group_task_ids(self, session):
+        """Return task ids for the external TaskGroup"""
+        refreshed_dag_info = DagBag(read_dags_from_db=True).get_dag(self.external_dag_id, session)
+        task_group: Optional["TaskGroup"] = refreshed_dag_info.task_group_dict.get(
+            self.external_task_group_id
+        )
+        if not task_group:
+            raise AirflowException(
+                f"The external task group {self.external_task_group_id} in "
+                f"DAG {self.external_dag_id} does not exist."
+            )
+        task_ids = [task.task_id for task in task_group]
+        return task_ids
+

Review comment:
       The main piece that you retrieve a list of tasks for a TaskGroup. I believe that `read_dags_from_db=True` is safe to use here because serialized dag is enabled by default in 2.0.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588860807



##########
File path: airflow/sensors/external_task.py
##########
@@ -134,20 +146,23 @@ def __init__(
         self.execution_delta = execution_delta
         self.execution_date_fn = execution_date_fn
         self.external_dag_id = external_dag_id
+        self.external_task_group_id = external_task_group_id
         self.external_task_id = external_task_id
         self.check_existence = check_existence

Review comment:
       This is `False` by default, which may make sense for external_dag or external_task. But external_task_group has to check and get an existing dag in order to get the list of task_ids. 
   
   I wonder if we can change the default to True or even have `check_existence` enabled required? This can give more useful errors when the external task/dag does not exist. Also, I can't think of the reason to have a Sensor waiting for an object that doesn't exist until it times out




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588860807



##########
File path: airflow/sensors/external_task.py
##########
@@ -134,20 +146,23 @@ def __init__(
         self.execution_delta = execution_delta
         self.execution_date_fn = execution_date_fn
         self.external_dag_id = external_dag_id
+        self.external_task_group_id = external_task_group_id
         self.external_task_id = external_task_id
         self.check_existence = check_existence

Review comment:
       This is `False` by default, which maybe make sense for external_dag or external_task. But external_task_group has to check and get an existing dag in order to get the list of task_ids. 
   
   I wonder if we can change the default to True or even have `check_existence` enabled required? This can give more useful errors when the external task/dag does not exist. Also, I can't think of the reason to have a Sensor waiting for an object that doesn't exist until it times out




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r603839218



##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +184,23 @@ def poke(self, context, session=None):
         if self.failed_states:
             count_failed = self.get_count(dttm_filter, session, self.failed_states)
 
-        if count_failed == len(dttm_filter):
+        if count_failed > 0:

Review comment:
       Here I am making the assumption that as long as there is at least one external task failure, then we will want to fail the sensor. Though this changes the original behavior, I think this will be a better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-810700606


   [The Workflow run](https://github.com/apache/airflow/actions/runs/703463197) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-919071392


   Can you fix the conflicts please @xinbinhuang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-816928243


   @KevinYang21 @kaxil @yuqian90  would love to have your feedback :) (not pinning ash because he's on holiday)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal edited a comment on pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

Posted by GitBox <gi...@apache.org>.
eladkal edited a comment on pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#issuecomment-997460724


   > Thanks for the nudge! Will try to wrap it up before the holidays hit.
   
   Great :) re-opening so it won't be missed 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org