You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/03/06 09:40:14 UTC

[GitHub] [airflow] xinbinhuang commented on a change in pull request #14640: Allow ExternalTaskSensor to wait for taskgroup

xinbinhuang commented on a change in pull request #14640:
URL: https://github.com/apache/airflow/pull/14640#discussion_r588859705



##########
File path: airflow/sensors/external_task.py
##########
@@ -164,18 +179,23 @@ def poke(self, context, session=None):
         if self.failed_states:
             count_failed = self.get_count(dttm_filter, session, self.failed_states)
 
-        if count_failed == len(dttm_filter):
+        if 0 < count_failed <= len(dttm_filter):

Review comment:
       Here I am making the assumption that as long as there is at least one external task failure, then we will want to fail the sensor. Though this changes the original behavior, I wonder in what situation users will allow partial failure?

##########
File path: airflow/sensors/external_task.py
##########
@@ -206,29 +228,48 @@ def get_count(self, dttm_filter, session, states) -> int:
         """
         TI = TaskInstance
         DR = DagRun
+
         if self.external_task_id:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id == self.external_task_id,
-                    TI.state.in_(states),  # pylint: disable=no-member
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id == self.external_task_id)
                 .scalar()
             )
-        else:
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),  # pylint: disable=no-member
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)

Review comment:
       This is trying to converge back to the original behavior, and we don't need this if we fail it whenever one of the task fails.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org