You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/13 09:44:30 UTC
[airflow] branch main updated: Add template support for external_task_ids. (#22809)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7331eefc39 Add template support for external_task_ids. (#22809)
7331eefc39 is described below
commit 7331eefc393b8f1fae6f3cf061cf17eb5eaa3fc8
Author: Karthikeyan Singaravelan <ti...@gmail.com>
AuthorDate: Wed Apr 13 15:14:21 2022 +0530
Add template support for external_task_ids. (#22809)
---
airflow/sensors/external_task.py | 2 +-
tests/sensors/test_external_task_sensor.py | 43 ++++++++++++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index b0ffed69bd..be860611f9 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -77,7 +77,7 @@ class ExternalTaskSensor(BaseSensorOperator):
or DAG does not exist (default value: False).
"""
- template_fields = ['external_dag_id', 'external_task_id']
+ template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids']
ui_color = '#19647e'
@property
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index f30172e1d4..1bcfbfbb3f 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -134,6 +134,48 @@ class TestExternalTaskSensor(unittest.TestCase):
"unit_test_dag failed."
)
+ def test_external_task_sensor_external_task_id_param(self):
+ """Test external_task_ids is set properly when external_task_id is passed as a template"""
+ self.test_time_sensor()
+ op = ExternalTaskSensor(
+ task_id='test_external_task_sensor_check',
+ external_dag_id='{{ params.dag_id }}',
+ external_task_id='{{ params.task_id }}',
+ params={
+ 'dag_id': TEST_DAG_ID,
+ 'task_id': TEST_TASK_ID,
+ },
+ dag=self.dag,
+ )
+
+ with self.assertLogs(op.log, level=logging.INFO) as cm:
+ op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ assert (
+ f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] "
+ f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
+ )
+
+ def test_external_task_sensor_external_task_ids_param(self):
+ """Test external_task_ids rendering when a template is passed."""
+ self.test_time_sensor()
+ op = ExternalTaskSensor(
+ task_id='test_external_task_sensor_check',
+ external_dag_id='{{ params.dag_id }}',
+ external_task_ids=['{{ params.task_id }}'],
+ params={
+ 'dag_id': TEST_DAG_ID,
+ 'task_id': TEST_TASK_ID,
+ },
+ dag=self.dag,
+ )
+
+ with self.assertLogs(op.log, level=logging.INFO) as cm:
+ op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ assert (
+ f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] "
+ f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
+ )
+
def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self):
self.test_time_sensor(task_id=TEST_TASK_ID)
self.test_time_sensor(task_id=TEST_TASK_ID_ALTERNATE)
@@ -421,6 +463,7 @@ def test_external_task_sensor_templated(dag_maker, app):
assert instance.task.external_dag_id == f"dag_{DEFAULT_DATE.date()}"
assert instance.task.external_task_id == f"task_{DEFAULT_DATE.date()}"
+ assert instance.task.external_task_ids == [f"task_{DEFAULT_DATE.date()}"]
# Verify that the operator link uses the rendered value of ``external_dag_id``.
app.config['SERVER_NAME'] = ""