You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/07/06 20:22:14 UTC

[airflow] 02/39: Fix ExternalTaskSensor when there is not task group TIs for the current execution date (#32009)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3815a48b978ebfb9737ec949922dff3ae62e1a64
Author: Hussein Awala <hu...@awala.fr>
AuthorDate: Wed Jun 21 11:55:45 2023 +0200

    Fix ExternalTaskSensor when there is not task group TIs for the current execution date (#32009)
    
    * Add a check on none TIs for the current execution date
    
    Signed-off-by: Hussein Awala <hu...@awala.fr>
    
    * replace inline if-else by old one
    
    Signed-off-by: Hussein Awala <hu...@awala.fr>
    
    ---------
    
    Signed-off-by: Hussein Awala <hu...@awala.fr>
    (cherry picked from commit 14eb1d3116ecef15be7be9a8f9d08757e74f981c)
---
 airflow/sensors/external_task.py           | 13 ++++++++-----
 tests/sensors/test_external_task_sensor.py | 20 ++++++++++++++++++++
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 959ebe5131..69ba41ef09 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -366,11 +366,14 @@ class ExternalTaskSensor(BaseSensorOperator):
             ) / len(self.external_task_ids)
         elif self.external_task_group_id:
             external_task_group_task_ids = self.get_external_task_group_task_ids(session, dttm_filter)
-            count = (
-                self._count_query(TI, session, states, dttm_filter)
-                .filter(tuple_in_condition((TI.task_id, TI.map_index), external_task_group_task_ids))
-                .scalar()
-            ) / len(external_task_group_task_ids)
+            if not external_task_group_task_ids:
+                count = 0
+            else:
+                count = (
+                    self._count_query(TI, session, states, dttm_filter)
+                    .filter(tuple_in_condition((TI.task_id, TI.map_index), external_task_group_task_ids))
+                    .scalar()
+                ) / len(external_task_group_task_ids)
         else:
             count = self._count_query(DR, session, states, dttm_filter).scalar()
         return count
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 966c3a2139..7c62357e9e 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -808,6 +808,26 @@ exit 0
         ):
             op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_external_task_group_when_there_is_no_TIs(self):
+        """Test that the sensor does not fail when there are no TIs to check."""
+        self.add_time_sensor()
+        self.add_dummy_task_group_with_dynamic_tasks(State.FAILED)
+        op = ExternalTaskSensor(
+            task_id="test_external_task_sensor_check",
+            external_dag_id=TEST_DAG_ID,
+            external_task_group_id=TEST_TASK_GROUP_ID,
+            failed_states=[State.FAILED],
+            dag=self.dag,
+            poke_interval=1,
+            timeout=3,
+        )
+        with pytest.raises(AirflowSensorTimeout):
+            op.run(
+                start_date=DEFAULT_DATE + timedelta(hours=1),
+                end_date=DEFAULT_DATE + timedelta(hours=1),
+                ignore_ti_state=True,
+            )
+
 
 def test_external_task_sensor_check_zipped_dag_existence(dag_zip_maker):
     with dag_zip_maker("test_external_task_sensor_check_existense.py") as dagbag: