You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/03/09 10:33:49 UTC

[airflow] 02/04: Migrate remaining core sensors tests to `pytest` (#28204)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bb90f4afdc55129dca73b3e412f49c3b533d5d17
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Thu Dec 8 16:55:27 2022 +0400

    Migrate remaining core sensors tests to `pytest` (#28204)
    
    (cherry picked from commit e93b669d841828b5f912edf40f255299b1833521)
---
 tests/sensors/test_date_time.py                    | 13 ++--
 tests/sensors/test_external_task_sensor.py         | 79 +++++++++++-----------
 tests/sensors/test_time_sensor.py                  |  6 +-
 tests/task/__init__.py                             |  4 --
 .../task/{__init__.py => task_runner/conftest.py}  | 24 ++++++-
 5 files changed, 69 insertions(+), 57 deletions(-)

diff --git a/tests/sensors/test_date_time.py b/tests/sensors/test_date_time.py
index b82fbf3f07..a298300c54 100644
--- a/tests/sensors/test_date_time.py
+++ b/tests/sensors/test_date_time.py
@@ -20,7 +20,6 @@ from __future__ import annotations
 from unittest.mock import patch
 
 import pytest
-from parameterized import parameterized
 
 from airflow.models.dag import DAG
 from airflow.sensors.date_time import DateTimeSensor
@@ -35,7 +34,8 @@ class TestDateTimeSensor:
         args = {"owner": "airflow", "start_date": DEFAULT_DATE}
         cls.dag = DAG("test_dag", default_args=args)
 
-    @parameterized.expand(
+    @pytest.mark.parametrize(
+        "task_id, target_time, expected",
         [
             (
                 "valid_datetime",
@@ -52,7 +52,7 @@ class TestDateTimeSensor:
                 "{{ ds }}",
                 "{{ ds }}",
             ),
-        ]
+        ],
     )
     def test_valid_input(self, task_id, target_time, expected):
         """target_time should be a string as it is a template field"""
@@ -71,7 +71,8 @@ class TestDateTimeSensor:
                 dag=self.dag,
             )
 
-    @parameterized.expand(
+    @pytest.mark.parametrize(
+        "task_id, target_time, expected",
         [
             (
                 "poke_datetime",
@@ -80,12 +81,12 @@ class TestDateTimeSensor:
             ),
             ("poke_str_extended", "2020-01-01T23:00:00.001+00:00", False),
             ("poke_str_basic_with_tz", "20200102T065959+8", True),
-        ]
+        ],
     )
     @patch(
         "airflow.sensors.date_time.timezone.utcnow",
         return_value=timezone.datetime(2020, 1, 1, 23, 0, tzinfo=timezone.utc),
     )
-    def test_poke(self, task_id, target_time, expected, mock_utcnow):
+    def test_poke(self, mock_utcnow, task_id, target_time, expected):
         op = DateTimeSensor(task_id=task_id, target_time=target_time, dag=self.dag)
         assert op.poke(None) == expected
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 1b5a5032cf..c02f9a8752 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -21,7 +21,6 @@ import hashlib
 import logging
 import os
 import tempfile
-import unittest
 import zipfile
 from datetime import time, timedelta
 
@@ -91,8 +90,8 @@ def dag_zip_maker():
     yield DagZipMaker()
 
 
-class TestExternalTaskSensor(unittest.TestCase):
-    def setUp(self):
+class TestExternalTaskSensor:
+    def setup_method(self):
         self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
         self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
         self.dag = DAG(TEST_DAG_ID, default_args=self.args)
@@ -200,8 +199,9 @@ class TestExternalTaskSensor(unittest.TestCase):
                 task_id="test_external_task_sensor_check",
                 external_dag_id=TEST_DAG_ID,
                 external_task_group_id="fake-task-group",
-                timeout=1,
+                timeout=0.001,
                 dag=self.dag,
+                poke_interval=0.1,
             )
             op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
@@ -266,7 +266,7 @@ class TestExternalTaskSensor(unittest.TestCase):
         )
         op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
-    def test_external_task_sensor_failed_states_as_success(self):
+    def test_external_task_sensor_failed_states_as_success(self, caplog):
         self.add_time_sensor()
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_check",
@@ -276,20 +276,16 @@ class TestExternalTaskSensor(unittest.TestCase):
             failed_states=["success"],
             dag=self.dag,
         )
-        with self.assertLogs(op.log, level=logging.INFO) as cm:
-            with pytest.raises(AirflowException) as ctx:
+        error_message = rf"Some of the external tasks \['{TEST_TASK_ID}'\] in DAG {TEST_DAG_ID} failed\."
+        with pytest.raises(AirflowException, match=error_message):
+            with caplog.at_level(logging.INFO, logger=op.log.name):
+                caplog.clear()
                 op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-            assert (
-                f"INFO:airflow.task.operators:Poking for tasks ['time_sensor_check'] "
-                f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
-            )
-            assert (
-                str(ctx.value) == "Some of the external tasks "
-                "['time_sensor_check'] in DAG "
-                "unit_test_dag failed."
-            )
+        assert (
+            f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
+        ) in caplog.messages
 
-    def test_external_task_sensor_soft_fail_failed_states_as_skipped(self, session=None):
+    def test_external_task_sensor_soft_fail_failed_states_as_skipped(self):
         self.add_time_sensor()
         op = ExternalTaskSensor(
             task_id="test_external_task_sensor_check",
@@ -311,7 +307,7 @@ class TestExternalTaskSensor(unittest.TestCase):
         assert len(task_instances) == 1, "Unexpected number of task instances"
         assert task_instances[0].state == State.SKIPPED, "Unexpected external task state"
 
-    def test_external_task_sensor_external_task_id_param(self):
+    def test_external_task_sensor_external_task_id_param(self, caplog):
         """Test external_task_ids is set properly when external_task_id is passed as a template"""
         self.add_time_sensor()
         op = ExternalTaskSensor(
@@ -322,14 +318,15 @@ class TestExternalTaskSensor(unittest.TestCase):
             dag=self.dag,
         )
 
-        with self.assertLogs(op.log, level=logging.INFO) as cm:
+        with caplog.at_level(logging.INFO, logger=op.log.name):
+            caplog.clear()
             op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
             assert (
-                f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] "
-                f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
-            )
+                f"Poking for tasks ['{TEST_TASK_ID}'] "
+                f"in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
+            ) in caplog.messages
 
-    def test_external_task_sensor_external_task_ids_param(self):
+    def test_external_task_sensor_external_task_ids_param(self, caplog):
         """Test external_task_ids rendering when a template is passed."""
         self.add_time_sensor()
         op = ExternalTaskSensor(
@@ -340,14 +337,15 @@ class TestExternalTaskSensor(unittest.TestCase):
             dag=self.dag,
         )
 
-        with self.assertLogs(op.log, level=logging.INFO) as cm:
+        with caplog.at_level(logging.INFO, logger=op.log.name):
+            caplog.clear()
             op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
             assert (
-                f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] "
-                f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
-            )
+                f"Poking for tasks ['{TEST_TASK_ID}'] "
+                f"in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
+            ) in caplog.messages
 
-    def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self):
+    def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, caplog):
         self.add_time_sensor(task_id=TEST_TASK_ID)
         self.add_time_sensor(task_id=TEST_TASK_ID_ALTERNATE)
         op = ExternalTaskSensor(
@@ -358,19 +356,18 @@ class TestExternalTaskSensor(unittest.TestCase):
             failed_states=["success"],
             dag=self.dag,
         )
-        with self.assertLogs(op.log, level=logging.INFO) as cm:
-            with pytest.raises(AirflowException) as ctx:
+        error_message = (
+            rf"Some of the external tasks \['{TEST_TASK_ID}'\, \'{TEST_TASK_ID_ALTERNATE}\'] "
+            rf"in DAG {TEST_DAG_ID} failed\."
+        )
+        with pytest.raises(AirflowException, match=error_message):
+            with caplog.at_level(logging.INFO, logger=op.log.name):
+                caplog.clear()
                 op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-            assert (
-                f"INFO:airflow.task.operators:Poking for tasks "
-                f"['time_sensor_check', 'time_sensor_check_alternate'] "
-                f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
-            )
-            assert (
-                str(ctx.value) == "Some of the external tasks "
-                "['time_sensor_check', 'time_sensor_check_alternate'] in DAG "
-                "unit_test_dag failed."
-            )
+        assert (
+            f"Poking for tasks ['{TEST_TASK_ID}', '{TEST_TASK_ID_ALTERNATE}'] "
+            f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... "
+        ) in caplog.messages
 
     def test_external_dag_sensor(self):
         other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
@@ -767,7 +764,7 @@ def test_external_task_sensor_templated(dag_maker, app):
         assert f"/dags/dag_{DEFAULT_DATE.date()}/grid" in url
 
 
-class TestExternalTaskMarker(unittest.TestCase):
+class TestExternalTaskMarker:
     def test_serialized_fields(self):
         assert {"recursion_depth"}.issubset(ExternalTaskMarker.get_serialized_fields())
 
diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py
index 06f2332625..70b875311c 100644
--- a/tests/sensors/test_time_sensor.py
+++ b/tests/sensors/test_time_sensor.py
@@ -23,7 +23,6 @@ from unittest.mock import patch
 import freezegun
 import pendulum
 import pytest
-from parameterized import parameterized
 
 from airflow.exceptions import TaskDeferred
 from airflow.models.dag import DAG
@@ -41,12 +40,13 @@ DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_
     return_value=timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc),
 )
 class TestTimeSensor:
-    @parameterized.expand(
+    @pytest.mark.parametrize(
+        "default_timezone, start_date, expected",
         [
             ("UTC", DEFAULT_DATE_WO_TZ, True),
             ("UTC", DEFAULT_DATE_WITH_TZ, False),
             (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False),
-        ]
+        ],
     )
     def test_timezone(self, mock_utcnow, default_timezone, start_date, expected):
         with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)):
diff --git a/tests/task/__init__.py b/tests/task/__init__.py
index 865dd751b4..217e5db960 100644
--- a/tests/task/__init__.py
+++ b/tests/task/__init__.py
@@ -15,7 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# flake8: noqa
-from __future__ import annotations
-
-from .task_runner import *
diff --git a/tests/task/__init__.py b/tests/task/task_runner/conftest.py
similarity index 55%
copy from tests/task/__init__.py
copy to tests/task/task_runner/conftest.py
index 865dd751b4..3af87d1ed7 100644
--- a/tests/task/__init__.py
+++ b/tests/task/task_runner/conftest.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,7 +14,26 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# flake8: noqa
+
 from __future__ import annotations
 
-from .task_runner import *
+import logging
+from logging.config import dictConfig
+
+import pytest
+
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.listeners.listener import get_listener_manager
+
+
+@pytest.fixture(scope="module", autouse=True)
+def reset_to_default_logging():
+    """
+    Initialize ``BaseTaskRunner`` might have side effect to another tests.
+    This fixture reset back logging to default after execution of separate module  in this test package.
+    """
+    yield
+    airflow_logger = logging.getLogger("airflow")
+    airflow_logger.handlers = []
+    dictConfig(DEFAULT_LOGGING_CONFIG)
+    get_listener_manager().clear()