You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/03/09 10:33:49 UTC
[airflow] 02/04: Migrate remaining core sensors tests to `pytest` (#28204)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit bb90f4afdc55129dca73b3e412f49c3b533d5d17
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Thu Dec 8 16:55:27 2022 +0400
Migrate remaining core sensors tests to `pytest` (#28204)
(cherry picked from commit e93b669d841828b5f912edf40f255299b1833521)
---
tests/sensors/test_date_time.py | 13 ++--
tests/sensors/test_external_task_sensor.py | 79 +++++++++++-----------
tests/sensors/test_time_sensor.py | 6 +-
tests/task/__init__.py | 4 --
.../task/{__init__.py => task_runner/conftest.py} | 24 ++++++-
5 files changed, 69 insertions(+), 57 deletions(-)
diff --git a/tests/sensors/test_date_time.py b/tests/sensors/test_date_time.py
index b82fbf3f07..a298300c54 100644
--- a/tests/sensors/test_date_time.py
+++ b/tests/sensors/test_date_time.py
@@ -20,7 +20,6 @@ from __future__ import annotations
from unittest.mock import patch
import pytest
-from parameterized import parameterized
from airflow.models.dag import DAG
from airflow.sensors.date_time import DateTimeSensor
@@ -35,7 +34,8 @@ class TestDateTimeSensor:
args = {"owner": "airflow", "start_date": DEFAULT_DATE}
cls.dag = DAG("test_dag", default_args=args)
- @parameterized.expand(
+ @pytest.mark.parametrize(
+ "task_id, target_time, expected",
[
(
"valid_datetime",
@@ -52,7 +52,7 @@ class TestDateTimeSensor:
"{{ ds }}",
"{{ ds }}",
),
- ]
+ ],
)
def test_valid_input(self, task_id, target_time, expected):
"""target_time should be a string as it is a template field"""
@@ -71,7 +71,8 @@ class TestDateTimeSensor:
dag=self.dag,
)
- @parameterized.expand(
+ @pytest.mark.parametrize(
+ "task_id, target_time, expected",
[
(
"poke_datetime",
@@ -80,12 +81,12 @@ class TestDateTimeSensor:
),
("poke_str_extended", "2020-01-01T23:00:00.001+00:00", False),
("poke_str_basic_with_tz", "20200102T065959+8", True),
- ]
+ ],
)
@patch(
"airflow.sensors.date_time.timezone.utcnow",
return_value=timezone.datetime(2020, 1, 1, 23, 0, tzinfo=timezone.utc),
)
- def test_poke(self, task_id, target_time, expected, mock_utcnow):
+ def test_poke(self, mock_utcnow, task_id, target_time, expected):
op = DateTimeSensor(task_id=task_id, target_time=target_time, dag=self.dag)
assert op.poke(None) == expected
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 1b5a5032cf..c02f9a8752 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -21,7 +21,6 @@ import hashlib
import logging
import os
import tempfile
-import unittest
import zipfile
from datetime import time, timedelta
@@ -91,8 +90,8 @@ def dag_zip_maker():
yield DagZipMaker()
-class TestExternalTaskSensor(unittest.TestCase):
- def setUp(self):
+class TestExternalTaskSensor:
+ def setup_method(self):
self.dagbag = DagBag(dag_folder=DEV_NULL, include_examples=True)
self.args = {"owner": "airflow", "start_date": DEFAULT_DATE}
self.dag = DAG(TEST_DAG_ID, default_args=self.args)
@@ -200,8 +199,9 @@ class TestExternalTaskSensor(unittest.TestCase):
task_id="test_external_task_sensor_check",
external_dag_id=TEST_DAG_ID,
external_task_group_id="fake-task-group",
- timeout=1,
+ timeout=0.001,
dag=self.dag,
+ poke_interval=0.1,
)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
@@ -266,7 +266,7 @@ class TestExternalTaskSensor(unittest.TestCase):
)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
- def test_external_task_sensor_failed_states_as_success(self):
+ def test_external_task_sensor_failed_states_as_success(self, caplog):
self.add_time_sensor()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
@@ -276,20 +276,16 @@ class TestExternalTaskSensor(unittest.TestCase):
failed_states=["success"],
dag=self.dag,
)
- with self.assertLogs(op.log, level=logging.INFO) as cm:
- with pytest.raises(AirflowException) as ctx:
+ error_message = rf"Some of the external tasks \['{TEST_TASK_ID}'\] in DAG {TEST_DAG_ID} failed\."
+ with pytest.raises(AirflowException, match=error_message):
+ with caplog.at_level(logging.INFO, logger=op.log.name):
+ caplog.clear()
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
- assert (
- f"INFO:airflow.task.operators:Poking for tasks ['time_sensor_check'] "
- f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
- )
- assert (
- str(ctx.value) == "Some of the external tasks "
- "['time_sensor_check'] in DAG "
- "unit_test_dag failed."
- )
+ assert (
+ f"Poking for tasks ['{TEST_TASK_ID}'] in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
+ ) in caplog.messages
- def test_external_task_sensor_soft_fail_failed_states_as_skipped(self, session=None):
+ def test_external_task_sensor_soft_fail_failed_states_as_skipped(self):
self.add_time_sensor()
op = ExternalTaskSensor(
task_id="test_external_task_sensor_check",
@@ -311,7 +307,7 @@ class TestExternalTaskSensor(unittest.TestCase):
assert len(task_instances) == 1, "Unexpected number of task instances"
assert task_instances[0].state == State.SKIPPED, "Unexpected external task state"
- def test_external_task_sensor_external_task_id_param(self):
+ def test_external_task_sensor_external_task_id_param(self, caplog):
"""Test external_task_ids is set properly when external_task_id is passed as a template"""
self.add_time_sensor()
op = ExternalTaskSensor(
@@ -322,14 +318,15 @@ class TestExternalTaskSensor(unittest.TestCase):
dag=self.dag,
)
- with self.assertLogs(op.log, level=logging.INFO) as cm:
+ with caplog.at_level(logging.INFO, logger=op.log.name):
+ caplog.clear()
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert (
- f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] "
- f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
- )
+ f"Poking for tasks ['{TEST_TASK_ID}'] "
+ f"in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
+ ) in caplog.messages
- def test_external_task_sensor_external_task_ids_param(self):
+ def test_external_task_sensor_external_task_ids_param(self, caplog):
"""Test external_task_ids rendering when a template is passed."""
self.add_time_sensor()
op = ExternalTaskSensor(
@@ -340,14 +337,15 @@ class TestExternalTaskSensor(unittest.TestCase):
dag=self.dag,
)
- with self.assertLogs(op.log, level=logging.INFO) as cm:
+ with caplog.at_level(logging.INFO, logger=op.log.name):
+ caplog.clear()
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
assert (
- f"INFO:airflow.task.operators:Poking for tasks ['{TEST_TASK_ID}'] "
- f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
- )
+ f"Poking for tasks ['{TEST_TASK_ID}'] "
+ f"in dag {TEST_DAG_ID} on {DEFAULT_DATE.isoformat()} ... "
+ ) in caplog.messages
- def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self):
+ def test_external_task_sensor_failed_states_as_success_mulitple_task_ids(self, caplog):
self.add_time_sensor(task_id=TEST_TASK_ID)
self.add_time_sensor(task_id=TEST_TASK_ID_ALTERNATE)
op = ExternalTaskSensor(
@@ -358,19 +356,18 @@ class TestExternalTaskSensor(unittest.TestCase):
failed_states=["success"],
dag=self.dag,
)
- with self.assertLogs(op.log, level=logging.INFO) as cm:
- with pytest.raises(AirflowException) as ctx:
+ error_message = (
+ rf"Some of the external tasks \['{TEST_TASK_ID}'\, \'{TEST_TASK_ID_ALTERNATE}\'] "
+ rf"in DAG {TEST_DAG_ID} failed\."
+ )
+ with pytest.raises(AirflowException, match=error_message):
+ with caplog.at_level(logging.INFO, logger=op.log.name):
+ caplog.clear()
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
- assert (
- f"INFO:airflow.task.operators:Poking for tasks "
- f"['time_sensor_check', 'time_sensor_check_alternate'] "
- f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... " in cm.output
- )
- assert (
- str(ctx.value) == "Some of the external tasks "
- "['time_sensor_check', 'time_sensor_check_alternate'] in DAG "
- "unit_test_dag failed."
- )
+ assert (
+ f"Poking for tasks ['{TEST_TASK_ID}', '{TEST_TASK_ID_ALTERNATE}'] "
+ f"in dag unit_test_dag on {DEFAULT_DATE.isoformat()} ... "
+ ) in caplog.messages
def test_external_dag_sensor(self):
other_dag = DAG("other_dag", default_args=self.args, end_date=DEFAULT_DATE, schedule="@once")
@@ -767,7 +764,7 @@ def test_external_task_sensor_templated(dag_maker, app):
assert f"/dags/dag_{DEFAULT_DATE.date()}/grid" in url
-class TestExternalTaskMarker(unittest.TestCase):
+class TestExternalTaskMarker:
def test_serialized_fields(self):
assert {"recursion_depth"}.issubset(ExternalTaskMarker.get_serialized_fields())
diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py
index 06f2332625..70b875311c 100644
--- a/tests/sensors/test_time_sensor.py
+++ b/tests/sensors/test_time_sensor.py
@@ -23,7 +23,6 @@ from unittest.mock import patch
import freezegun
import pendulum
import pytest
-from parameterized import parameterized
from airflow.exceptions import TaskDeferred
from airflow.models.dag import DAG
@@ -41,12 +40,13 @@ DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_
return_value=timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc),
)
class TestTimeSensor:
- @parameterized.expand(
+ @pytest.mark.parametrize(
+ "default_timezone, start_date, expected",
[
("UTC", DEFAULT_DATE_WO_TZ, True),
("UTC", DEFAULT_DATE_WITH_TZ, False),
(DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False),
- ]
+ ],
)
def test_timezone(self, mock_utcnow, default_timezone, start_date, expected):
with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)):
diff --git a/tests/task/__init__.py b/tests/task/__init__.py
index 865dd751b4..217e5db960 100644
--- a/tests/task/__init__.py
+++ b/tests/task/__init__.py
@@ -15,7 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-# flake8: noqa
-from __future__ import annotations
-
-from .task_runner import *
diff --git a/tests/task/__init__.py b/tests/task/task_runner/conftest.py
similarity index 55%
copy from tests/task/__init__.py
copy to tests/task/task_runner/conftest.py
index 865dd751b4..3af87d1ed7 100644
--- a/tests/task/__init__.py
+++ b/tests/task/task_runner/conftest.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,7 +14,26 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-# flake8: noqa
+
from __future__ import annotations
-from .task_runner import *
+import logging
+from logging.config import dictConfig
+
+import pytest
+
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.listeners.listener import get_listener_manager
+
+
+@pytest.fixture(scope="module", autouse=True)
+def reset_to_default_logging():
+ """
+ Initialize ``BaseTaskRunner`` might have side effect to another tests.
+ This fixture reset back logging to default after execution of separate module in this test package.
+ """
+ yield
+ airflow_logger = logging.getLogger("airflow")
+ airflow_logger.handlers = []
+ dictConfig(DEFAULT_LOGGING_CONFIG)
+ get_listener_manager().clear()