You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/07/06 20:22:14 UTC
[airflow] 02/39: Fix ExternalTaskSensor when there is not task group TIs for the current execution date (#32009)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3815a48b978ebfb9737ec949922dff3ae62e1a64
Author: Hussein Awala <hu...@awala.fr>
AuthorDate: Wed Jun 21 11:55:45 2023 +0200
Fix ExternalTaskSensor when there is not task group TIs for the current execution date (#32009)
* Add a check on none TIs for the current execution date
Signed-off-by: Hussein Awala <hu...@awala.fr>
* replace inline if-else by old one
Signed-off-by: Hussein Awala <hu...@awala.fr>
---------
Signed-off-by: Hussein Awala <hu...@awala.fr>
(cherry picked from commit 14eb1d3116ecef15be7be9a8f9d08757e74f981c)
---
airflow/sensors/external_task.py | 13 ++++++++-----
tests/sensors/test_external_task_sensor.py | 20 ++++++++++++++++++++
2 files changed, 28 insertions(+), 5 deletions(-)
diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 959ebe5131..69ba41ef09 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -366,11 +366,14 @@ class ExternalTaskSensor(BaseSensorOperator):
) / len(self.external_task_ids)
elif self.external_task_group_id:
external_task_group_task_ids = self.get_external_task_group_task_ids(session, dttm_filter)
- count = (
- self._count_query(TI, session, states, dttm_filter)
- .filter(tuple_in_condition((TI.task_id, TI.map_index), external_task_group_task_ids))
- .scalar()
- ) / len(external_task_group_task_ids)
+ if not external_task_group_task_ids:
+ count = 0
+ else:
+ count = (
+ self._count_query(TI, session, states, dttm_filter)
+ .filter(tuple_in_condition((TI.task_id, TI.map_index), external_task_group_task_ids))
+ .scalar()
+ ) / len(external_task_group_task_ids)
else:
count = self._count_query(DR, session, states, dttm_filter).scalar()
return count
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 966c3a2139..7c62357e9e 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -808,6 +808,26 @@ exit 0
):
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ def test_external_task_group_when_there_is_no_TIs(self):
+ """Test that the sensor does not fail when there are no TIs to check."""
+ self.add_time_sensor()
+ self.add_dummy_task_group_with_dynamic_tasks(State.FAILED)
+ op = ExternalTaskSensor(
+ task_id="test_external_task_sensor_check",
+ external_dag_id=TEST_DAG_ID,
+ external_task_group_id=TEST_TASK_GROUP_ID,
+ failed_states=[State.FAILED],
+ dag=self.dag,
+ poke_interval=1,
+ timeout=3,
+ )
+ with pytest.raises(AirflowSensorTimeout):
+ op.run(
+ start_date=DEFAULT_DATE + timedelta(hours=1),
+ end_date=DEFAULT_DATE + timedelta(hours=1),
+ ignore_ti_state=True,
+ )
+
def test_external_task_sensor_check_zipped_dag_existence(dag_zip_maker):
with dag_zip_maker("test_external_task_sensor_check_existense.py") as dagbag: