You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/08/22 18:13:19 UTC

[airflow] branch main updated: Add support for TaskGroup in ExternalTaskSensor (#24902)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bc04c5ff0f Add support for TaskGroup in ExternalTaskSensor (#24902)
bc04c5ff0f is described below

commit bc04c5ff0fa56e80d3d5def38b798170f6575ee8
Author: Ashish Patel <as...@gmail.com>
AuthorDate: Mon Aug 22 19:13:09 2022 +0100

    Add support for TaskGroup in ExternalTaskSensor (#24902)
---
 .../example_external_task_marker_dag.py            |  17 ++-
 airflow/models/dag.py                              |   7 ++
 airflow/sensors/external_task.py                   | 125 +++++++++++++++------
 .../howto/operator/external_task_sensor.rst        |  10 ++
 tests/sensors/test_external_task_sensor.py         | 120 +++++++++++++++++---
 5 files changed, 226 insertions(+), 53 deletions(-)

diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py
index 88dae819de..874658d9bb 100644
--- a/airflow/example_dags/example_external_task_marker_dag.py
+++ b/airflow/example_dags/example_external_task_marker_dag.py
@@ -80,5 +80,18 @@ with DAG(
         mode="reschedule",
     )
     # [END howto_operator_external_task_sensor]
-    child_task2 = EmptyOperator(task_id="child_task2")
-    child_task1 >> child_task2
+
+    # [START howto_operator_external_task_sensor_with_task_group]
+    child_task2 = ExternalTaskSensor(
+        task_id="child_task2",
+        external_dag_id=parent_dag.dag_id,
+        external_task_group_id='parent_dag_task_group_id',
+        timeout=600,
+        allowed_states=['success'],
+        failed_states=['failed', 'skipped'],
+        mode="reschedule",
+    )
+    # [END howto_operator_external_task_sensor_with_task_group]
+
+    child_task3 = EmptyOperator(task_id="child_task3")
+    child_task1 >> child_task2 >> child_task3
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index b3318c9fd8..384ed840f1 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2214,6 +2214,13 @@ class DAG(LoggingMixin):
     def has_task(self, task_id: str):
         return task_id in self.task_dict
 
+    def has_task_group(self, task_group_id: str) -> bool:
+        return task_group_id in self.task_group_dict
+
+    @cached_property
+    def task_group_dict(self):
+        return {k: v for k, v in self._task_group.get_task_group_dict().items() if k is not None}
+
     def get_task(self, task_id: str, include_subdags: bool = False) -> Operator:
         if task_id in self.task_dict:
             return self.task_dict[task_id]
diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 7081651a68..19005ea19e 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -36,6 +36,9 @@ from airflow.utils.helpers import build_airflow_url_with_query
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Query
+
 
 class ExternalDagLink(BaseOperatorLink):
     """
@@ -54,9 +57,13 @@ class ExternalDagLink(BaseOperatorLink):
 
 class ExternalTaskSensor(BaseSensorOperator):
     """
-    Waits for a different DAG or a task in a different DAG to complete for a
+    Waits for a different DAG, a task group, or a task in a different DAG to complete for a
     specific logical date.
 
+    If both `external_task_group_id` and `external_task_id` are ``None`` (default), the sensor
+    waits for the DAG.
+    Values for `external_task_group_id` and `external_task_id` can't be set at the same time.
+
     By default the ExternalTaskSensor will wait for the external task to
     succeed, at which point it will also succeed. However, by default it will
     *not* fail if the external task fails, but will continue to check the status
@@ -78,7 +85,7 @@ class ExternalTaskSensor(BaseSensorOperator):
     :param external_dag_id: The dag_id that contains the task you want to
         wait for
     :param external_task_id: The task_id that contains the task you want to
-        wait for. If ``None`` (default value) the sensor waits for the DAG
+        wait for.
     :param external_task_ids: The list of task_ids that you want to wait for.
         If ``None`` (default value) the sensor waits for the DAG. Either
         external_task_id or external_task_ids can be passed to
@@ -111,6 +118,7 @@ class ExternalTaskSensor(BaseSensorOperator):
         external_dag_id: str,
         external_task_id: Optional[str] = None,
         external_task_ids: Optional[Collection[str]] = None,
+        external_task_group_id: Optional[str] = None,
         allowed_states: Optional[Iterable[str]] = None,
         failed_states: Optional[Iterable[str]] = None,
         execution_delta: Optional[datetime.timedelta] = None,
@@ -139,18 +147,25 @@ class ExternalTaskSensor(BaseSensorOperator):
         if external_task_id is not None:
             external_task_ids = [external_task_id]
 
-        if external_task_ids:
+        if external_task_group_id and external_task_ids:
+            raise ValueError(
+                "Values for `external_task_group_id` and `external_task_id` or `external_task_ids` "
+                "can't be set at the same time"
+            )
+
+        if external_task_ids or external_task_group_id:
             if not total_states <= set(State.task_states):
                 raise ValueError(
                     f'Valid values for `allowed_states` and `failed_states` '
-                    f'when `external_task_id` or `external_task_ids` is not `None`: {State.task_states}'
+                    f'when `external_task_id` or `external_task_ids` or `external_task_group_id` '
+                    f'is not `None`: {State.task_states}'
                 )
-            if len(external_task_ids) > len(set(external_task_ids)):
+            if external_task_ids and len(external_task_ids) > len(set(external_task_ids)):
                 raise ValueError('Duplicate task_ids passed in external_task_ids parameter')
         elif not total_states <= set(State.dag_states):
             raise ValueError(
                 f'Valid values for `allowed_states` and `failed_states` '
-                f'when `external_task_id` is `None`: {State.dag_states}'
+                f'when `external_task_id` and `external_task_group_id` is `None`: {State.dag_states}'
             )
 
         if execution_delta is not None and execution_date_fn is not None:
@@ -164,27 +179,39 @@ class ExternalTaskSensor(BaseSensorOperator):
         self.external_dag_id = external_dag_id
         self.external_task_id = external_task_id
         self.external_task_ids = external_task_ids
+        self.external_task_group_id = external_task_group_id
         self.check_existence = check_existence
         self._has_checked_existence = False
 
-    @provide_session
-    def poke(self, context, session=None):
+    def _get_dttm_filter(self, context):
         if self.execution_delta:
             dttm = context['logical_date'] - self.execution_delta
         elif self.execution_date_fn:
             dttm = self._handle_execution_date_fn(context=context)
         else:
             dttm = context['logical_date']
+        return dttm if isinstance(dttm, list) else [dttm]
 
-        dttm_filter = dttm if isinstance(dttm, list) else [dttm]
+    @provide_session
+    def poke(self, context, session=None):
+        dttm_filter = self._get_dttm_filter(context)
         serialized_dttm_filter = ','.join(dt.isoformat() for dt in dttm_filter)
 
-        self.log.info(
-            'Poking for tasks %s in dag %s on %s ... ',
-            self.external_task_ids,
-            self.external_dag_id,
-            serialized_dttm_filter,
-        )
+        if self.external_task_ids:
+            self.log.info(
+                'Poking for tasks %s in dag %s on %s ... ',
+                self.external_task_ids,
+                self.external_dag_id,
+                serialized_dttm_filter,
+            )
+
+        if self.external_task_group_id:
+            self.log.info(
+                "Poking for task_group '%s' in dag '%s' on %s ... ",
+                self.external_task_group_id,
+                self.external_dag_id,
+                serialized_dttm_filter,
+            )
 
         # In poke mode this will check dag existence only once
         if self.check_existence and not self._has_checked_existence:
@@ -207,6 +234,17 @@ class ExternalTaskSensor(BaseSensorOperator):
                     f'Some of the external tasks {self.external_task_ids} '
                     f'in DAG {self.external_dag_id} failed.'
                 )
+            elif self.external_task_group_id:
+                if self.soft_fail:
+                    raise AirflowSkipException(
+                        f"The external task_group '{self.external_task_group_id}' "
+                        f"in DAG '{self.external_dag_id}' failed. Skipping due to soft_fail."
+                    )
+                raise AirflowException(
+                    f"The external task_group '{self.external_task_group_id}' "
+                    f"in DAG '{self.external_dag_id}' failed."
+                )
+
             else:
                 if self.soft_fail:
                     raise AirflowSkipException(
@@ -217,7 +255,7 @@ class ExternalTaskSensor(BaseSensorOperator):
         return count_allowed == len(dttm_filter)
 
     def _check_for_existence(self, session) -> None:
-        dag_to_wait = session.query(DagModel).filter(DagModel.dag_id == self.external_dag_id).first()
+        dag_to_wait = DagModel.get_current(self.external_dag_id, session)
 
         if not dag_to_wait:
             raise AirflowException(f'The external DAG {self.external_dag_id} does not exist.')
@@ -233,6 +271,15 @@ class ExternalTaskSensor(BaseSensorOperator):
                         f'The external task {external_task_id} in '
                         f'DAG {self.external_dag_id} does not exist.'
                     )
+
+        if self.external_task_group_id:
+            refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
+            if not refreshed_dag_info.has_task_group(self.external_task_group_id):
+                raise AirflowException(
+                    f"The external task group '{self.external_task_group_id}' in "
+                    f"DAG '{self.external_dag_id}' does not exist."
+                )
+
         self._has_checked_existence = True
 
     def get_count(self, dttm_filter, session, states) -> int:
@@ -251,28 +298,40 @@ class ExternalTaskSensor(BaseSensorOperator):
 
         if self.external_task_ids:
             count = (
-                session.query(func.count())  # .count() is inefficient
-                .filter(
-                    TI.dag_id == self.external_dag_id,
-                    TI.task_id.in_(self.external_task_ids),
-                    TI.state.in_(states),
-                    TI.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(self.external_task_ids))
                 .scalar()
-            )
-            count = count / len(self.external_task_ids)
-        else:
+            ) / len(self.external_task_ids)
+        elif self.external_task_group_id:
+            external_task_group_task_ids = self.get_external_task_group_task_ids(session)
             count = (
-                session.query(func.count())
-                .filter(
-                    DR.dag_id == self.external_dag_id,
-                    DR.state.in_(states),
-                    DR.execution_date.in_(dttm_filter),
-                )
+                self._count_query(TI, session, states, dttm_filter)
+                .filter(TI.task_id.in_(external_task_group_task_ids))
                 .scalar()
-            )
+            ) / len(external_task_group_task_ids)
+        else:
+            count = self._count_query(DR, session, states, dttm_filter).scalar()
         return count
 
+    def _count_query(self, model, session, states, dttm_filter) -> "Query":
+        query = session.query(func.count()).filter(
+            model.dag_id == self.external_dag_id,
+            model.state.in_(states),  # pylint: disable=no-member
+            model.execution_date.in_(dttm_filter),
+        )
+        return query
+
+    def get_external_task_group_task_ids(self, session):
+        refreshed_dag_info = DagBag(read_dags_from_db=True).get_dag(self.external_dag_id, session)
+        task_group = refreshed_dag_info.task_group_dict.get(self.external_task_group_id)
+
+        if task_group:
+            return [task.task_id for task in task_group]
+
+        # returning default task_id as group_id itself, this will avoid any failure in case of
+        # 'check_existence=False' and will fail on timeout
+        return [self.external_task_group_id]
+
     def _handle_execution_date_fn(self, context) -> Any:
         """
         This function is to handle backwards compatibility with how this operator was
diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst
index f6ae421969..923f8ec3d1 100644
--- a/docs/apache-airflow/howto/operator/external_task_sensor.rst
+++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst
@@ -53,6 +53,16 @@ via ``allowed_states`` and ``failed_states`` parameters.
     :start-after: [START howto_operator_external_task_sensor]
     :end-before: [END howto_operator_external_task_sensor]
 
+ExternalTaskSensor with task_group dependency
+---------------------------------------------
+In Addition, we can also use the :class:`~airflow.sensors.external_task.ExternalTaskSensor` to make tasks on a DAG
+wait for another ``task_group`` on a different DAG for a specific ``execution_date``.
+
+.. exampleinclude:: /../../airflow/example_dags/example_external_task_marker_dag.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_external_task_sensor_with_task_group]
+    :end-before: [END howto_operator_external_task_sensor_with_task_group]
 
 
 ExternalTaskMarker
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index c1505dc13d..501224bea4 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -25,6 +25,7 @@ from airflow import exceptions, settings
 from airflow.exceptions import AirflowException, AirflowSensorTimeout
 from airflow.models import DagBag, DagRun, TaskInstance
 from airflow.models.dag import DAG
+from airflow.models.serialized_dag import SerializedDagModel
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor, ExternalTaskSensorLink
@@ -32,6 +33,7 @@ from airflow.sensors.time_sensor import TimeSensor
 from airflow.serialization.serialized_objects import SerializedBaseOperator
 from airflow.utils.session import provide_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
+from airflow.utils.task_group import TaskGroup
 from airflow.utils.timezone import datetime
 from airflow.utils.types import DagRunType
 from tests.test_utils.db import clear_db_runs
@@ -40,6 +42,7 @@ DEFAULT_DATE = datetime(2015, 1, 1)
 TEST_DAG_ID = 'unit_test_dag'
 TEST_TASK_ID = 'time_sensor_check'
 TEST_TASK_ID_ALTERNATE = 'time_sensor_check_alternate'
+TEST_TASK_GROUP_ID = 'time_sensor_group_id'
 DEV_NULL = '/dev/null'
 
 
@@ -54,12 +57,24 @@ class TestExternalTaskSensor(unittest.TestCase):
         self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
         self.dag = DAG(TEST_DAG_ID, default_args=self.args)
 
-    def test_time_sensor(self, task_id=TEST_TASK_ID):
+    def add_time_sensor(self, task_id=TEST_TASK_ID):
         op = TimeSensor(task_id=task_id, target_time=time(0), dag=self.dag)
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def add_dummy_task_group(self, target_states=None):
+        target_states = [State.SUCCESS] * 2 if target_states is None else target_states
+        with self.dag as dag:
+            with TaskGroup(group_id=TEST_TASK_GROUP_ID) as task_group:
+                _ = [EmptyOperator(task_id=f"task{i}") for i in range(len(target_states))]
+            SerializedDagModel.write_dag(dag)
+
+        for idx, task in enumerate(task_group):
+            ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+            ti.run(ignore_ti_state=True, mark_success=True)
+            ti.set_state(target_states[idx])
+
     def test_external_task_sensor(self):
-        self.test_time_sensor()
+        self.add_time_sensor()
         op = ExternalTaskSensor(
             task_id='test_external_task_sensor_check',
             external_dag_id=TEST_DAG_ID,
@@ -69,8 +84,8 @@ class TestExternalTaskSensor(unittest.TestCase):
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_external_task_sensor_multiple_task_ids(self):
-        self.test_time_sensor(task_id=TEST_TASK_ID)
-        self.test_time_sensor(task_id=TEST_TASK_ID_ALTERNATE)
+        self.add_time_sensor(task_id=TEST_TASK_ID)
+        self.add_time_sensor(task_id=TEST_TASK_ID_ALTERNATE)
         op = ExternalTaskSensor(
             task_id='test_external_task_sensor_check_task_ids',
             external_dag_id=TEST_DAG_ID,
@@ -79,6 +94,75 @@ class TestExternalTaskSensor(unittest.TestCase):
         )
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_external_task_sensor_with_task_group(self):
+        self.add_time_sensor()
+        self.add_dummy_task_group()
+        op = ExternalTaskSensor(
+            task_id='test_external_task_sensor_task_group',
+            external_dag_id=TEST_DAG_ID,
+            external_task_group_id=TEST_TASK_GROUP_ID,
+            dag=self.dag,
+        )
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_raise_with_external_task_sensor_task_group_and_task_id(self):
+        with pytest.raises(ValueError) as ctx:
+            ExternalTaskSensor(
+                task_id='test_external_task_sensor_task_group_with_task_id_failed_status',
+                external_dag_id=TEST_DAG_ID,
+                external_task_ids=TEST_TASK_ID,
+                external_task_group_id=TEST_TASK_GROUP_ID,
+                dag=self.dag,
+            )
+        assert (
+            str(ctx.value) == "Values for `external_task_group_id` and `external_task_id` or "
+            "`external_task_ids` can't be set at the same time"
+        )
+
+    # by default i.e. check_existence=False, if task_group doesn't exist, the sensor will run till timeout,
+    # this behaviour is similar to external_task_id doesn't exists
+    def test_external_task_group_not_exists_without_check_existence(self):
+        self.add_time_sensor()
+        self.add_dummy_task_group()
+        with pytest.raises(AirflowException, match=f"Snap. Time is OUT. DAG id: {TEST_DAG_ID}"):
+            op = ExternalTaskSensor(
+                task_id='test_external_task_sensor_check',
+                external_dag_id=TEST_DAG_ID,
+                external_task_group_id='fake-task-group',
+                timeout=1,
+                dag=self.dag,
+            )
+            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_external_task_group_sensor_success(self):
+        self.add_time_sensor()
+        self.add_dummy_task_group()
+        op = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check',
+            external_dag_id=TEST_DAG_ID,
+            external_task_group_id=TEST_TASK_GROUP_ID,
+            failed_states=[State.FAILED],
+            dag=self.dag,
+        )
+        op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_external_task_group_sensor_failed_states(self):
+        ti_states = [State.FAILED, State.FAILED]
+        self.add_time_sensor()
+        self.add_dummy_task_group(ti_states)
+        op = ExternalTaskSensor(
+            task_id='test_external_task_sensor_check',
+            external_dag_id=TEST_DAG_ID,
+            external_task_group_id=TEST_TASK_GROUP_ID,
+            failed_states=[State.FAILED],
+            dag=self.dag,
+        )
+        with pytest.raises(
+            AirflowException,
+            match=f"The external task_group '{TEST_TASK_GROUP_ID}' in DAG '{TEST_DAG_ID}' failed.",
+        ):
+            op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
     def test_catch_overlap_allowed_failed_state(self):
         with pytest.raises(AirflowException):
             ExternalTaskSensor(
@@ -101,7 +185,7 @@ class TestExternalTaskSensor(unittest.TestCase):
             )
 
     def test_external_task_sensor_failed_states(self):
-        self.test_time_sensor()
+        self.add_time_sensor()
         op = ExternalTaskSensor(
             task_id='test_external_task_sensor_check',
             external_dag_id=TEST_DAG_ID,
@@ -112,7 +196,7 @@ class TestExternalTaskSensor(unittest.TestCase):
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_external_task_sensor_failed_states_as_success(self):
-        self.test_time_sensor()
+        self.add_time_sensor()
         op = ExternalTaskSensor(
             task_id='test_external_task_sensor_check',
             external_dag_id=TEST_DAG_ID,
@@ -135,7 +219,7 @@ class TestExternalTaskSensor(unittest.TestCase):
             )
 
     def test_external_task_sensor_soft_fail_failed_states_as_skipped(self, session=None):
-        self.test_time_sensor()
+        self.add_time_sensor()
         op = ExternalTaskSensor(
             task_id='test_external_task_sensor_check',
             external_dag_id=TEST_DAG_ID,
@@ -158,7 +242,7 @@ class TestExternalTaskSensor(unittest.TestCase):
 
     def test_external_task_sensor_external_task_id_param(self):
         """Test external_task_ids is set properly when external_task_id is passed as a template"""
-        self.test_time_sensor()
+        self.add_time_sensor()
         op = ExternalTaskSensor(
             task_id='test_external_task_sensor_check',
             external_dag_id='{{ params.dag_id }}',
@@ -176,7 +260,7 @@ class TestExternalTaskSensor(unittest.TestCase):
 
     def test_external_task_sensor_external_task_ids_param(self):
         """Test external_task_ids rendering when a template is passed."""
-        self.test_time_sensor()
+        self.add_time_sensor()
         op = ExternalTaskSensor(
             task_id='test_external_task_sensor_check',
             external_dag_id='{{ params.dag_id }}',
@@ -193,8 +277,8 @@ class TestExternalTaskSensor(unittest.TestCase):
             )
 
     def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self):
-        self.test_time_sensor(task_id=TEST_TASK_ID)
-        self.test_time_sensor(task_id=TEST_TASK_ID_ALTERNATE)
+        self.add_time_sensor(task_id=TEST_TASK_ID)
+        self.add_time_sensor(task_id=TEST_TASK_ID_ALTERNATE)
         op = ExternalTaskSensor(
             task_id='test_external_task_sensor_check_task_ids',
             external_dag_id=TEST_DAG_ID,
@@ -333,7 +417,7 @@ exit 0
             task_with_failure.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_external_task_sensor_delta(self):
-        self.test_time_sensor()
+        self.add_time_sensor()
         op = ExternalTaskSensor(
             task_id='test_external_task_sensor_check_delta',
             external_dag_id=TEST_DAG_ID,
@@ -345,7 +429,7 @@ exit 0
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_external_task_sensor_fn(self):
-        self.test_time_sensor()
+        self.add_time_sensor()
         # check that the execution_fn works
         op1 = ExternalTaskSensor(
             task_id='test_external_task_sensor_check_delta_1',
@@ -372,7 +456,7 @@ exit 0
 
     def test_external_task_sensor_fn_multiple_args(self):
         """Check this task sensor passes multiple args with full context. If no failure, means clean run."""
-        self.test_time_sensor()
+        self.add_time_sensor()
 
         def my_func(dt, context):
             assert context['logical_date'] == dt
@@ -390,7 +474,7 @@ exit 0
 
     def test_external_task_sensor_fn_kwargs(self):
         """Check this task sensor passes multiple args with full context. If no failure, means clean run."""
-        self.test_time_sensor()
+        self.add_time_sensor()
 
         def my_func(dt, ds_nodash, tomorrow_ds_nodash):
             assert ds_nodash == dt.strftime("%Y%m%d")
@@ -408,7 +492,7 @@ exit 0
         op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_external_task_sensor_error_delta_and_fn(self):
-        self.test_time_sensor()
+        self.add_time_sensor()
         # Test that providing execution_delta and a function raises an error
         with pytest.raises(ValueError):
             ExternalTaskSensor(
@@ -422,7 +506,7 @@ exit 0
             )
 
     def test_external_task_sensor_error_task_id_and_task_ids(self):
-        self.test_time_sensor()
+        self.add_time_sensor()
         # Test that providing execution_delta and a function raises an error
         with pytest.raises(ValueError):
             ExternalTaskSensor(
@@ -435,7 +519,7 @@ exit 0
             )
 
     def test_catch_duplicate_task_ids(self):
-        self.test_time_sensor()
+        self.add_time_sensor()
         # Test By passing same task_id multiple times
         with pytest.raises(ValueError):
             ExternalTaskSensor(