You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/13 09:44:30 UTC

[airflow] branch main updated: Add template support for external_task_ids. (#22809)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7331eefc39 Add template support for external_task_ids. (#22809)
7331eefc39 is described below

commit 7331eefc393b8f1fae6f3cf061cf17eb5eaa3fc8
Author: Karthikeyan Singaravelan <ti...@gmail.com>
AuthorDate: Wed Apr 13 15:14:21 2022 +0530

    Add template support for external_task_ids. (#22809)
---
 airflow/sensors/external_task.py           |  2 +-
 tests/sensors/test_external_task_sensor.py | 43 ++++++++++++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index b0ffed69bd..be860611f9 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -77,7 +77,7 @@ class ExternalTaskSensor(BaseSensorOperator):
         or DAG does not exist (default value: False).
     """
 
-    template_fields = ['external_dag_id', 'external_task_id']
+    template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids']
     ui_color = '#19647e'
 
     @property
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index f30172e1d4..1bcfbfbb3f 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -134,6 +134,48 @@ class TestExternalTaskSensor(unittest.TestCase):
                 "unit_test_dag failed."
             )
 
+    def test_external_task_sensor_external_task_id_param(self):
+        """Test external_task_ids is set properly when external_task_id is passed as a template"""
+        self.test_time_sensor()
+        op = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check',
+            external_dag_id='{{ params.dag_id }}',
+            external_task_id='{{ params.task_id }}',
+            params={
+                'dag_id': TEST_DAG_ID,
+                'task_id': TEST_TASK_ID,
+            },
+            dag=self.dag,
+        )
+
+        with self.assertLogs(op.log, level=logging.INFO) as cm:
+            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            assert (
+                f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] "
+                f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
+            )
+
+    def test_external_task_sensor_external_task_ids_param(self):
+        """Test external_task_ids rendering when a template is passed."""
+        self.test_time_sensor()
+        op = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check',
+            external_dag_id='{{ params.dag_id }}',
+            external_task_ids=['{{ params.task_id }}'],
+            params={
+                'dag_id': TEST_DAG_ID,
+                'task_id': TEST_TASK_ID,
+            },
+            dag=self.dag,
+        )
+
+        with self.assertLogs(op.log, level=logging.INFO) as cm:
+            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+            assert (
+                f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] "
+                f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
+            )
+
     def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self):
         self.test_time_sensor(task_id=TEST_TASK_ID)
         self.test_time_sensor(task_id=TEST_TASK_ID_ALTERNATE)
@@ -421,6 +463,7 @@ def test_external_task_sensor_templated(dag_maker, app):
 
     assert instance.task.external_dag_id == f"dag_{DEFAULT_DATE.date()}"
     assert instance.task.external_task_id == f"task_{DEFAULT_DATE.date()}"
+    assert instance.task.external_task_ids == [f"task_{DEFAULT_DATE.date()}"]
 
     # Verify that the operator link uses the rendered value of ``external_dag_id``.
     app.config['SERVER_NAME'] = ""