You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/03/08 13:17:57 UTC

[airflow] 02/12: Replace freezegun with time-machine (#28193)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2ecbfd0118ac1db9bf87ed9a44853b11d6b972f0
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Mon Dec 12 17:38:03 2022 +0000

    Replace freezegun with time-machine (#28193)
    
    The primary driver for this was a niggle that the durations output for
    one test was reporting over 52 years:
    
    > 1670340356.40s call     tests/jobs/test_base_job.py::TestBaseJob::test_heartbeat
    
    It turns out this was caused by freezegun, but time_machine fixes this.
    It also might be a bit faster, but that isn't a noticeable difference for
    us. (No runtime difference for the changed files, but it does make
    collection quicker: from 10s to 8s)
    
    (cherry picked from commit 4d0fd8ef6adc35f683c7561f05688a65fd7451f4)
---
 setup.py                                           |   3 +-
 tests/api/client/test_local_client.py              |   4 +-
 .../endpoints/test_dag_run_endpoint.py             |   4 +-
 tests/api_connexion/schemas/test_dataset_schema.py |   4 +-
 tests/conftest.py                                  |  20 ++--
 tests/core/test_sentry.py                          |   4 +-
 tests/dag_processing/test_manager.py               |   6 +-
 tests/executors/test_celery_executor.py            |   6 +-
 tests/jobs/test_scheduler_job.py                   |   4 +-
 tests/models/test_dag.py                           |   6 +-
 tests/models/test_dagbag.py                        |  14 +--
 tests/models/test_taskinstance.py                  |  18 ++--
 tests/models/test_timestamp.py                     |   6 +-
 tests/operators/test_datetime.py                   |  16 +--
 tests/operators/test_latest_only_operator.py       |   4 +-
 tests/operators/test_weekday.py                    |  10 +-
 tests/providers/amazon/aws/hooks/test_eks.py       |  10 +-
 .../amazon/aws/sensors/test_s3_keys_unchanged.py   |  24 ++---
 .../amazon/aws/utils/test_eks_get_token.py         |   4 +-
 .../elasticsearch/log/test_es_task_handler.py      |  15 ++-
 .../test_cloud_storage_transfer_service.py         |   4 +-
 tests/sensors/test_base.py                         | 107 ++++++++++-----------
 tests/sensors/test_time_sensor.py                  |  11 +--
 tests/ti_deps/deps/test_not_in_retry_period_dep.py |   6 +-
 tests/ti_deps/deps/test_runnable_exec_date_dep.py  |   6 +-
 tests/timetables/test_interval_timetable.py        |   8 +-
 tests/timetables/test_trigger_timetable.py         |   6 +-
 tests/utils/log/test_file_processor_handler.py     |   8 +-
 tests/utils/test_serve_logs.py                     |  14 +--
 tests/www/test_security.py                         |  10 +-
 tests/www/views/test_views_grid.py                 |  23 +++--
 tests/www/views/test_views_tasks.py                |  17 ++--
 32 files changed, 194 insertions(+), 208 deletions(-)

diff --git a/setup.py b/setup.py
index 5ed4bb756f..a132eb861c 100644
--- a/setup.py
+++ b/setup.py
@@ -339,7 +339,6 @@ mypy_dependencies = [
     "types-croniter",
     "types-Deprecated",
     "types-docutils",
-    "types-freezegun",
     "types-paramiko",
     "types-protobuf",
     "types-python-dateutil",
@@ -371,7 +370,6 @@ devel_only = [
     "flake8-colors",
     "flake8-implicit-str-concat",
     "flaky",
-    "freezegun",
     "gitpython",
     "ipdb",
     # make sure that we are using stable sorting order from 5.* version (some changes were introduced
@@ -403,6 +401,7 @@ devel_only = [
     "requests_mock",
     "rich-click>=1.5",
     "semver",
+    "time-machine",
     "towncrier",
     "twine",
     "wheel",
diff --git a/tests/api/client/test_local_client.py b/tests/api/client/test_local_client.py
index 70188ba5e1..d079f6c510 100644
--- a/tests/api/client/test_local_client.py
+++ b/tests/api/client/test_local_client.py
@@ -24,7 +24,7 @@ from unittest.mock import patch
 
 import pendulum
 import pytest
-from freezegun import freeze_time
+import time_machine
 
 from airflow.api.client.local_client import Client
 from airflow.example_dags import example_bash_operator
@@ -72,7 +72,7 @@ class TestLocalClient:
             run_after=pendulum.instance(EXECDATE_NOFRACTIONS)
         )
 
-        with freeze_time(EXECDATE):
+        with time_machine.travel(EXECDATE, tick=False):
             # no execution date, execution date should be set automatically
 
             self.client.trigger_dag(dag_id=test_dag_id)
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 93c9c6a1d0..a80e8b9f4a 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -20,7 +20,7 @@ from datetime import timedelta
 from unittest import mock
 
 import pytest
-from freezegun import freeze_time
+import time_machine
 from parameterized import parameterized
 
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
@@ -1350,7 +1350,7 @@ class TestPatchDagRunState(TestDagRunEndpoint):
         }
 
     @pytest.mark.parametrize("invalid_state", ["running"])
-    @freeze_time(TestDagRunEndpoint.default_time)
+    @time_machine.travel(TestDagRunEndpoint.default_time)
     def test_should_response_400_for_non_existing_dag_run_state(self, invalid_state, dag_maker):
         dag_id = "TEST_DAG_ID"
         dag_run_id = "TEST_DAG_RUN_ID"
diff --git a/tests/api_connexion/schemas/test_dataset_schema.py b/tests/api_connexion/schemas/test_dataset_schema.py
index 5147768aed..85deb129f3 100644
--- a/tests/api_connexion/schemas/test_dataset_schema.py
+++ b/tests/api_connexion/schemas/test_dataset_schema.py
@@ -16,7 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-from freezegun import freeze_time
+import time_machine
 
 from airflow.api_connexion.schemas.dataset_schema import (
     DatasetCollection,
@@ -37,7 +37,7 @@ class TestDatasetSchemaBase:
         clear_db_dags()
         clear_db_datasets()
         self.timestamp = "2022-06-10T12:02:44+00:00"
-        self.freezer = freeze_time(self.timestamp)
+        self.freezer = time_machine.travel(self.timestamp, tick=False)
         self.freezer.start()
 
     def teardown_method(self) -> None:
diff --git a/tests/conftest.py b/tests/conftest.py
index bdaec7da0f..74d412f849 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -24,8 +24,8 @@ from contextlib import ExitStack, suppress
 from datetime import datetime, timedelta
 from typing import TYPE_CHECKING
 
-import freezegun
 import pytest
+import time_machine
 
 # We should set these before loading _any_ of the rest of airflow so that the
 # unit test mode config is set as early as possible.
@@ -400,7 +400,7 @@ def pytest_runtest_setup(item):
 @pytest.fixture
 def frozen_sleep(monkeypatch):
     """
-    Use freezegun to "stub" sleep, so that it takes no time, but that
+    Use time-machine to "stub" sleep, so that it takes no time, but that
     ``datetime.now()`` appears to move forwards
 
     If your module under test does ``import time`` and then ``time.sleep``::
@@ -416,21 +416,21 @@ def frozen_sleep(monkeypatch):
             monkeypatch.setattr('my_mod.sleep', frozen_sleep)
             my_mod.fn_under_test()
     """
-    freezegun_control = None
+    traveller = None
 
     def fake_sleep(seconds):
-        nonlocal freezegun_control
+        nonlocal traveller
         utcnow = datetime.utcnow()
-        if freezegun_control is not None:
-            freezegun_control.stop()
-        freezegun_control = freezegun.freeze_time(utcnow + timedelta(seconds=seconds))
-        freezegun_control.start()
+        if traveller is not None:
+            traveller.stop()
+        traveller = time_machine.travel(utcnow + timedelta(seconds=seconds))
+        traveller.start()
 
     monkeypatch.setattr("time.sleep", fake_sleep)
     yield fake_sleep
 
-    if freezegun_control is not None:
-        freezegun_control.stop()
+    if traveller is not None:
+        traveller.stop()
 
 
 @pytest.fixture(scope="session")
diff --git a/tests/core/test_sentry.py b/tests/core/test_sentry.py
index 2b29a1c704..0e8607f2ad 100644
--- a/tests/core/test_sentry.py
+++ b/tests/core/test_sentry.py
@@ -22,7 +22,7 @@ import importlib
 from unittest import mock
 
 import pytest
-from freezegun import freeze_time
+import time_machine
 from sentry_sdk import configure_scope
 
 from airflow.operators.python import PythonOperator
@@ -129,7 +129,7 @@ class TestSentryHook:
             for key, value in scope._tags.items():
                 assert TEST_SCOPE[key] == value
 
-    @freeze_time(CRUMB_DATE.isoformat())
+    @time_machine.travel(CRUMB_DATE)
     def test_add_breadcrumbs(self, sentry, task_instance):
         """
         Test adding breadcrumbs.
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 1cbfb7275d..862dfdb2c8 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -34,7 +34,7 @@ from unittest import mock
 from unittest.mock import MagicMock, PropertyMock
 
 import pytest
-from freezegun import freeze_time
+import time_machine
 from sqlalchemy import func
 
 from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest, SlaCallbackRequest
@@ -470,7 +470,7 @@ class TestDagFileProcessorManager:
         manager._file_stats = {
             "file_1.py": DagFileStat(1, 0, last_finish_time, timedelta(seconds=1.0), 1),
         }
-        with freeze_time(freezed_base_time):
+        with time_machine.travel(freezed_base_time):
             manager.set_file_paths(dag_files)
             assert manager._file_path_queue == collections.deque()
             # File Path Queue will be empty as the "modified time" < "last finish time"
@@ -481,7 +481,7 @@ class TestDagFileProcessorManager:
         # than the last_parse_time but still less than now - min_file_process_interval
         file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
         file_1_new_mtime_ts = file_1_new_mtime.timestamp()
-        with freeze_time(freezed_base_time):
+        with time_machine.travel(freezed_base_time):
             manager.set_file_paths(dag_files)
             assert manager._file_path_queue == collections.deque()
             # File Path Queue will be empty as the "modified time" < "last finish time"
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index cbbe64c564..2a52776dec 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -27,9 +27,9 @@ from unittest import mock
 # leave this it is used by the test worker
 import celery.contrib.testing.tasks  # noqa: F401
 import pytest
+import time_machine
 from celery import Celery
 from celery.result import AsyncResult
-from freezegun import freeze_time
 from kombu.asynchronous import set_event_loop
 from parameterized import parameterized
 
@@ -162,7 +162,7 @@ class TestCeleryExecutor:
         assert executor.try_adopt_task_instances(tis) == tis
 
     @pytest.mark.backend("mysql", "postgres")
-    @freeze_time("2020-01-01")
+    @time_machine.travel("2020-01-01", tick=False)
     def test_try_adopt_task_instances(self):
         start_date = timezone.utcnow() - timedelta(days=2)
 
@@ -270,7 +270,7 @@ class TestCeleryExecutor:
         assert ti.external_executor_id is None
 
     @pytest.mark.backend("mysql", "postgres")
-    @freeze_time("2020-01-01")
+    @time_machine.travel("2020-01-01", tick=False)
     def test_pending_tasks_timeout_with_appropriate_config_setting(self):
         start_date = timezone.utcnow() - timedelta(days=2)
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b661293ea2..9d1b4dd452 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -30,7 +30,7 @@ from unittest.mock import MagicMock, patch
 
 import psutil
 import pytest
-from freezegun import freeze_time
+import time_machine
 from sqlalchemy import func
 
 import airflow.example_dags
@@ -3290,7 +3290,7 @@ class TestSchedulerJob:
 
         assert dag3.get_last_dagrun().creating_job_id == self.scheduler_job.id
 
-    @freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9))
+    @time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9), tick=False)
     @mock.patch("airflow.jobs.scheduler_job.Stats.timing")
     def test_start_dagruns(self, stats_timing, dag_maker):
         """
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 855ecc4b6f..6fc846f20a 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -34,8 +34,8 @@ from unittest.mock import patch
 import jinja2
 import pendulum
 import pytest
+import time_machine
 from dateutil.relativedelta import relativedelta
-from freezegun import freeze_time
 from sqlalchemy import inspect
 
 import airflow
@@ -2122,7 +2122,7 @@ my_postgres_conn:
         # The DR should be scheduled in the last 2 hours, not 6 hours ago
         assert next_date == six_hours_ago_to_the_hour
 
-    @freeze_time(timezone.datetime(2020, 1, 5))
+    @time_machine.travel(timezone.datetime(2020, 1, 5), tick=False)
     def test_next_dagrun_info_timedelta_schedule_and_catchup_false(self):
         """
         Test that the dag file processor does not create multiple dagruns
@@ -2142,7 +2142,7 @@ my_postgres_conn:
         next_info = dag.next_dagrun_info(next_info.data_interval)
         assert next_info and next_info.logical_date == timezone.datetime(2020, 1, 5)
 
-    @freeze_time(timezone.datetime(2020, 5, 4))
+    @time_machine.travel(timezone.datetime(2020, 5, 4))
     def test_next_dagrun_info_timedelta_schedule_and_catchup_true(self):
         """
         Test that the dag file processor creates multiple dagruns
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 102ad0b518..dbeaba5dd1 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -32,7 +32,7 @@ from unittest import mock
 from unittest.mock import patch
 
 import pytest
-from freezegun import freeze_time
+import time_machine
 from sqlalchemy import func
 from sqlalchemy.exc import OperationalError
 
@@ -879,7 +879,7 @@ class TestDagBag:
         """
         db_clean_up()
         session = settings.Session()
-        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)) as frozen_time:
+        with time_machine.travel(tz.datetime(2020, 1, 5, 0, 0, 0), tick=False) as frozen_time:
             dagbag = DagBag(
                 dag_folder=os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py"),
                 include_examples=False,
@@ -889,7 +889,7 @@ class TestDagBag:
 
             def _sync_to_db():
                 mock_sync_perm_for_dag.reset_mock()
-                frozen_time.tick(20)
+                frozen_time.shift(20)
                 dagbag.sync_to_db(session=session)
 
             dag = dagbag.dags["test_example_bash_operator"]
@@ -950,7 +950,7 @@ class TestDagBag:
         Serialized DAG table after 'min_serialized_dag_fetch_interval' seconds are passed.
         """
 
-        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 0)):
+        with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 0)), tick=False):
             example_bash_op_dag = DagBag(include_examples=True).dags.get("example_bash_operator")
             SerializedDagModel.write_dag(dag=example_bash_op_dag)
 
@@ -962,18 +962,18 @@ class TestDagBag:
 
         # Check that if min_serialized_dag_fetch_interval has not passed we do not fetch the DAG
         # from DB
-        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 4)):
+        with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 4)), tick=False):
             with assert_queries_count(0):
                 assert dag_bag.get_dag("example_bash_operator").tags == ["example", "example2"]
 
         # Make a change in the DAG and write Serialized DAG to the DB
-        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 6)):
+        with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 6)), tick=False):
             example_bash_op_dag.tags += ["new_tag"]
             SerializedDagModel.write_dag(dag=example_bash_op_dag)
 
         # Since min_serialized_dag_fetch_interval is passed verify that calling 'dag_bag.get_dag'
         # fetches the Serialized DAG from DB
-        with freeze_time(tz.datetime(2020, 1, 5, 0, 0, 8)):
+        with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 8)), tick=False):
             with assert_queries_count(2):
                 updated_ser_dag_1 = dag_bag.get_dag("example_bash_operator")
                 updated_ser_dag_1_update_time = dag_bag.dags_last_fetched["example_bash_operator"]
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 80b6d6d302..9bcfe5f677 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -33,7 +33,7 @@ from uuid import uuid4
 
 import pendulum
 import pytest
-from freezegun import freeze_time
+import time_machine
 
 from airflow import models, settings
 from airflow.decorators import task, task_group
@@ -564,11 +564,11 @@ class TestTaskInstance:
         assert ti.next_kwargs is None
         assert ti.state == state
 
-    @freeze_time("2021-09-19 04:56:35", as_kwarg="frozen_time")
-    def test_retry_delay(self, dag_maker, frozen_time=None):
+    def test_retry_delay(self, dag_maker, time_machine):
         """
         Test that retry delays are respected
         """
+        time_machine.move_to("2021-09-19 04:56:35", tick=False)
         with dag_maker(dag_id="test_retry_handling"):
             task = BashOperator(
                 task_id="test_retry_handling_op",
@@ -593,12 +593,12 @@ class TestTaskInstance:
         assert ti.try_number == 2
 
         # second run -- still up for retry because retry_delay hasn't expired
-        frozen_time.tick(delta=datetime.timedelta(seconds=3))
+        time_machine.coordinates.shift(3)
         run_with_error(ti)
         assert ti.state == State.UP_FOR_RETRY
 
         # third run -- failed
-        frozen_time.tick(delta=datetime.datetime.resolution)
+        time_machine.coordinates.shift(datetime.datetime.resolution)
         run_with_error(ti)
         assert ti.state == State.FAILED
 
@@ -756,7 +756,7 @@ class TestTaskInstance:
             expected_try_number,
             expected_task_reschedule_count,
         ):
-            with freeze_time(run_date):
+            with time_machine.travel(run_date, tick=False):
                 try:
                     ti.run()
                 except AirflowException:
@@ -856,7 +856,7 @@ class TestTaskInstance:
             expected_task_reschedule_count,
         ):
             ti.refresh_from_task(task)
-            with freeze_time(run_date):
+            with time_machine.travel(run_date, tick=False):
                 try:
                     ti.run()
                 except AirflowException:
@@ -955,7 +955,7 @@ class TestTaskInstance:
             expected_task_reschedule_count,
         ):
             ti.refresh_from_task(task)
-            with freeze_time(run_date):
+            with time_machine.travel(run_date, tick=False):
                 try:
                     ti.run()
                 except AirflowException:
@@ -1023,7 +1023,7 @@ class TestTaskInstance:
             expected_try_number,
             expected_task_reschedule_count,
         ):
-            with freeze_time(run_date):
+            with time_machine.travel(run_date, tick=False):
                 try:
                     ti.run()
                 except AirflowException:
diff --git a/tests/models/test_timestamp.py b/tests/models/test_timestamp.py
index 0313183f12..2315e25dd4 100644
--- a/tests/models/test_timestamp.py
+++ b/tests/models/test_timestamp.py
@@ -18,7 +18,7 @@ from __future__ import annotations
 
 import pendulum
 import pytest
-from freezegun import freeze_time
+import time_machine
 
 from airflow.models import Log, TaskInstance
 from airflow.operators.empty import EmptyOperator
@@ -53,7 +53,7 @@ def add_log(execdate, session, dag_maker, timezone_override=None):
 @provide_session
 def test_timestamp_behaviour(dag_maker, session=None):
     execdate = timezone.utcnow()
-    with freeze_time(execdate):
+    with time_machine.travel(execdate, tick=False):
         current_time = timezone.utcnow()
         old_log = add_log(execdate, session, dag_maker)
         session.expunge(old_log)
@@ -65,7 +65,7 @@ def test_timestamp_behaviour(dag_maker, session=None):
 @provide_session
 def test_timestamp_behaviour_with_timezone(dag_maker, session=None):
     execdate = timezone.utcnow()
-    with freeze_time(execdate):
+    with time_machine.travel(execdate, tick=False):
         current_time = timezone.utcnow()
         old_log = add_log(execdate, session, dag_maker, timezone_override=pendulum.timezone("Europe/Warsaw"))
         session.expunge(old_log)
diff --git a/tests/operators/test_datetime.py b/tests/operators/test_datetime.py
index 242442e8ea..027c4c1f0f 100644
--- a/tests/operators/test_datetime.py
+++ b/tests/operators/test_datetime.py
@@ -20,8 +20,8 @@ from __future__ import annotations
 import datetime
 import unittest
 
-import freezegun
 import pytest
+import time_machine
 
 from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun, TaskInstance as TI
@@ -113,7 +113,7 @@ class TestBranchDateTimeOperator(unittest.TestCase):
                 dag=self.dag,
             )
 
-    @freezegun.freeze_time("2020-07-07 10:54:05")
+    @time_machine.travel("2020-07-07 10:54:05")
     def test_branch_datetime_operator_falls_within_range(self):
         """Check BranchDateTimeOperator branch operation"""
         for target_lower, target_upper in self.targets:
@@ -143,7 +143,7 @@ class TestBranchDateTimeOperator(unittest.TestCase):
                 self.branch_op.target_upper = target_upper
 
                 for date in dates:
-                    with freezegun.freeze_time(date):
+                    with time_machine.travel(date):
                         self.branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
                         self._assert_task_ids_match_states(
@@ -154,7 +154,7 @@ class TestBranchDateTimeOperator(unittest.TestCase):
                             }
                         )
 
-    @freezegun.freeze_time("2020-07-07 10:54:05")
+    @time_machine.travel("2020-07-07 10:54:05")
     def test_branch_datetime_operator_upper_comparison_within_range(self):
         """Check BranchDateTimeOperator branch operation"""
         for _, target_upper in self.targets:
@@ -172,7 +172,7 @@ class TestBranchDateTimeOperator(unittest.TestCase):
                     }
                 )
 
-    @freezegun.freeze_time("2020-07-07 10:54:05")
+    @time_machine.travel("2020-07-07 10:54:05")
     def test_branch_datetime_operator_lower_comparison_within_range(self):
         """Check BranchDateTimeOperator branch operation"""
         for target_lower, _ in self.targets:
@@ -190,7 +190,7 @@ class TestBranchDateTimeOperator(unittest.TestCase):
                     }
                 )
 
-    @freezegun.freeze_time("2020-07-07 12:00:00")
+    @time_machine.travel("2020-07-07 12:00:00")
     def test_branch_datetime_operator_upper_comparison_outside_range(self):
         """Check BranchDateTimeOperator branch operation"""
         for _, target_upper in self.targets:
@@ -208,7 +208,7 @@ class TestBranchDateTimeOperator(unittest.TestCase):
                     }
                 )
 
-    @freezegun.freeze_time("2020-07-07 09:00:00")
+    @time_machine.travel("2020-07-07 09:00:00")
     def test_branch_datetime_operator_lower_comparison_outside_range(self):
         """Check BranchDateTimeOperator branch operation"""
         for target_lower, _ in self.targets:
@@ -226,7 +226,7 @@ class TestBranchDateTimeOperator(unittest.TestCase):
                     }
                 )
 
-    @freezegun.freeze_time("2020-12-01 09:00:00")
+    @time_machine.travel("2020-12-01 09:00:00")
     def test_branch_datetime_operator_use_task_logical_date(self):
         """Check if BranchDateTimeOperator uses task execution date"""
         in_between_date = timezone.datetime(2020, 7, 7, 10, 30, 0)
diff --git a/tests/operators/test_latest_only_operator.py b/tests/operators/test_latest_only_operator.py
index cf1c3ca18f..8c0e3d0ae1 100644
--- a/tests/operators/test_latest_only_operator.py
+++ b/tests/operators/test_latest_only_operator.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 import datetime
 
-from freezegun import freeze_time
+import time_machine
 
 from airflow import settings
 from airflow.models import DagRun, TaskInstance
@@ -64,7 +64,7 @@ class TestLatestOnlyOperator:
             default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
             schedule=INTERVAL,
         )
-        self.freezer = freeze_time(FROZEN_NOW)
+        self.freezer = time_machine.travel(FROZEN_NOW, tick=False)
         self.freezer.start()
 
     def teardown_method(self):
diff --git a/tests/operators/test_weekday.py b/tests/operators/test_weekday.py
index 046e593578..1c843c94c9 100644
--- a/tests/operators/test_weekday.py
+++ b/tests/operators/test_weekday.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 import datetime
 
 import pytest
-from freezegun import freeze_time
+import time_machine
 from parameterized import parameterized
 
 from airflow.exceptions import AirflowException
@@ -94,7 +94,7 @@ class TestBranchDayOfWeekOperator:
     @pytest.mark.parametrize(
         "weekday", TEST_CASE_BRANCH_FOLLOW_TRUE.values(), ids=TEST_CASE_BRANCH_FOLLOW_TRUE.keys()
     )
-    @freeze_time("2021-01-25")  # Monday
+    @time_machine.travel("2021-01-25")  # Monday
     def test_branch_follow_true(self, weekday):
         """Checks if BranchDayOfWeekOperator follows true branch"""
         print(datetime.datetime.now())
@@ -131,7 +131,7 @@ class TestBranchDayOfWeekOperator:
             },
         )
 
-    @freeze_time("2021-01-25")  # Monday
+    @time_machine.travel("2021-01-25")  # Monday
     def test_branch_follow_true_with_execution_date(self):
         """Checks if BranchDayOfWeekOperator follows true branch when set use_task_logical_date"""
 
@@ -166,7 +166,7 @@ class TestBranchDayOfWeekOperator:
             },
         )
 
-    @freeze_time("2021-01-25")  # Monday
+    @time_machine.travel("2021-01-25")  # Monday
     def test_branch_follow_false(self):
         """Checks if BranchDayOfWeekOperator follow false branch"""
 
@@ -245,7 +245,7 @@ class TestBranchDayOfWeekOperator:
                 dag=self.dag,
             )
 
-    @freeze_time("2021-01-25")  # Monday
+    @time_machine.travel("2021-01-25")  # Monday
     def test_branch_xcom_push_true_branch(self):
         """Check if BranchDayOfWeekOperator push to xcom value of follow_task_ids_if_true"""
         branch_op = BranchDayOfWeekOperator(
diff --git a/tests/providers/amazon/aws/hooks/test_eks.py b/tests/providers/amazon/aws/hooks/test_eks.py
index 3d3e51f94a..157c870973 100644
--- a/tests/providers/amazon/aws/hooks/test_eks.py
+++ b/tests/providers/amazon/aws/hooks/test_eks.py
@@ -25,10 +25,10 @@ from unittest import mock
 from urllib.parse import ParseResult, urlsplit
 
 import pytest
+import time_machine
 import yaml
 from _pytest._code import ExceptionInfo
 from botocore.exceptions import ClientError
-from freezegun import freeze_time
 from moto import mock_eks
 from moto.core import DEFAULT_ACCOUNT_ID
 from moto.core.exceptions import AWSError
@@ -298,7 +298,7 @@ class TestEksHooks:
             arn_under_test=generated_test_data.cluster_describe_output[ClusterAttributes.ARN],
         )
 
-    @freeze_time(FROZEN_TIME)
+    @time_machine.travel(FROZEN_TIME, tick=False)
     def test_create_cluster_generates_valid_cluster_created_timestamp(self, cluster_builder) -> None:
         _, generated_test_data = cluster_builder()
 
@@ -515,7 +515,7 @@ class TestEksHooks:
             arn_under_test=generated_test_data.nodegroup_describe_output[NodegroupAttributes.ARN],
         )
 
-    @freeze_time(FROZEN_TIME)
+    @time_machine.travel(FROZEN_TIME)
     def test_create_nodegroup_generates_valid_nodegroup_created_timestamp(self, nodegroup_builder) -> None:
         _, generated_test_data = nodegroup_builder()
 
@@ -523,7 +523,7 @@ class TestEksHooks:
 
         assert iso_date(result_time) == FROZEN_TIME
 
-    @freeze_time(FROZEN_TIME)
+    @time_machine.travel(FROZEN_TIME)
     def test_create_nodegroup_generates_valid_nodegroup_modified_timestamp(self, nodegroup_builder) -> None:
         _, generated_test_data = nodegroup_builder()
 
@@ -917,7 +917,7 @@ class TestEksHooks:
             arn_under_test=generated_test_data.fargate_describe_output[FargateProfileAttributes.ARN],
         )
 
-    @freeze_time(FROZEN_TIME)
+    @time_machine.travel(FROZEN_TIME)
     def test_create_fargate_profile_generates_valid_created_timestamp(self, fargate_profile_builder) -> None:
         _, generated_test_data = fargate_profile_builder()
 
diff --git a/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py b/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py
index 251f8d6258..726c4de7db 100644
--- a/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py
+++ b/tests/providers/amazon/aws/sensors/test_s3_keys_unchanged.py
@@ -21,7 +21,7 @@ from datetime import datetime
 from unittest import mock
 
 import pytest
-from freezegun import freeze_time
+import time_machine
 
 from airflow.models.dag import DAG, AirflowException
 from airflow.providers.amazon.aws.sensors.s3 import S3KeysUnchangedSensor
@@ -68,7 +68,7 @@ class TestS3KeysUnchangedSensor:
             dag=self.dag,
         ).render_template_fields({})
 
-    @freeze_time(DEFAULT_DATE, auto_tick_seconds=10)
+    @time_machine.travel(DEFAULT_DATE)
     def test_files_deleted_between_pokes_throw_error(self):
         self.sensor.allow_delete = False
         self.sensor.is_keys_unchanged({"a", "b"})
@@ -98,19 +98,19 @@ class TestS3KeysUnchangedSensor:
             ),
         ],
     )
-    @freeze_time(DEFAULT_DATE, auto_tick_seconds=10)
-    def test_key_changes(self, current_objects, expected_returns, inactivity_periods):
-        assert self.sensor.is_keys_unchanged(current_objects[0]) == expected_returns[0]
-        assert self.sensor.inactivity_seconds == inactivity_periods[0]
-        assert self.sensor.is_keys_unchanged(current_objects[1]) == expected_returns[1]
-        assert self.sensor.inactivity_seconds == inactivity_periods[1]
-        assert self.sensor.is_keys_unchanged(current_objects[2]) == expected_returns[2]
-        assert self.sensor.inactivity_seconds == inactivity_periods[2]
+    def test_key_changes(self, current_objects, expected_returns, inactivity_periods, time_machine):
+        time_machine.move_to(DEFAULT_DATE)
+        for current, expected, period in zip(current_objects, expected_returns, inactivity_periods):
+            assert self.sensor.is_keys_unchanged(current) == expected
+            assert self.sensor.inactivity_seconds == period
+            time_machine.coordinates.shift(10)
 
-    @freeze_time(DEFAULT_DATE, auto_tick_seconds=10)
     @mock.patch("airflow.providers.amazon.aws.sensors.s3.S3Hook")
-    def test_poke_succeeds_on_upload_complete(self, mock_hook):
+    def test_poke_succeeds_on_upload_complete(self, mock_hook, time_machine):
+        time_machine.move_to(DEFAULT_DATE)
         mock_hook.return_value.list_keys.return_value = {"a"}
         assert not self.sensor.poke(dict())
+        time_machine.coordinates.shift(10)
         assert not self.sensor.poke(dict())
+        time_machine.coordinates.shift(10)
         assert self.sensor.poke(dict())
diff --git a/tests/providers/amazon/aws/utils/test_eks_get_token.py b/tests/providers/amazon/aws/utils/test_eks_get_token.py
index 700bd18373..f4b71cb94b 100644
--- a/tests/providers/amazon/aws/utils/test_eks_get_token.py
+++ b/tests/providers/amazon/aws/utils/test_eks_get_token.py
@@ -23,12 +23,12 @@ import runpy
 from unittest import mock
 
 import pytest
-from freezegun import freeze_time
+import time_machine
 
 
 class TestGetEksToken:
     @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook")
-    @freeze_time("1995-02-14")
+    @time_machine.travel("1995-02-14", tick=False)
     @pytest.mark.parametrize(
         "args, expected_aws_conn_id, expected_region_name",
         [
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index bdf274732e..5e23f0bd92 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -27,7 +27,6 @@ from unittest import mock
 from urllib.parse import quote
 
 import elasticsearch
-import freezegun
 import pendulum
 import pytest
 
@@ -500,7 +499,7 @@ class TestElasticsearchTaskHandler:
         assert self.es_task_handler.supports_external_link == expected
 
     @mock.patch("sys.__stdout__", new_callable=io.StringIO)
-    def test_dynamic_offset(self, stdout_mock, ti):
+    def test_dynamic_offset(self, stdout_mock, ti, time_machine):
         # arrange
         handler = ElasticsearchTaskHandler(
             base_log_folder=self.local_log_location,
@@ -524,12 +523,12 @@ class TestElasticsearchTaskHandler:
         t2, t3 = t1 + pendulum.duration(seconds=5), t1 + pendulum.duration(seconds=10)
 
         # act
-        with freezegun.freeze_time(t1):
-            ti.log.info("Test")
-        with freezegun.freeze_time(t2):
-            ti.log.info("Test2")
-        with freezegun.freeze_time(t3):
-            ti.log.info("Test3")
+        time_machine.move_to(t1, tick=False)
+        ti.log.info("Test")
+        time_machine.move_to(t2, tick=False)
+        ti.log.info("Test2")
+        time_machine.move_to(t3, tick=False)
+        ti.log.info("Test3")
 
         # assert
         first_log, second_log, third_log = map(json.loads, stdout_mock.getvalue().strip().split("\n"))
diff --git a/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py b/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py
index d696ffa14d..3c45636214 100644
--- a/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py
+++ b/tests/providers/google/cloud/operators/test_cloud_storage_transfer_service.py
@@ -24,8 +24,8 @@ from datetime import date, time
 from unittest import mock
 
 import pytest
+import time_machine
 from botocore.credentials import Credentials
-from freezegun import freeze_time
 from parameterized import parameterized
 
 from airflow.exceptions import AirflowException
@@ -189,7 +189,7 @@ class TestTransferJobPreprocessor(unittest.TestCase):
         TransferJobPreprocessor(body=body).process_body()
         assert body[SCHEDULE][START_TIME_OF_DAY] == DICT_TIME
 
-    @freeze_time("2018-10-15")
+    @time_machine.travel("2018-10-15", tick=False)
     def test_should_set_default_schedule(self):
         body = {}
         TransferJobPreprocessor(body=body, default_schedule=True).process_body()
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index 8545b99f59..7417c183d8 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -21,7 +21,7 @@ from datetime import timedelta
 from unittest.mock import Mock, patch
 
 import pytest
-from freezegun import freeze_time
+import time_machine
 
 from airflow.exceptions import AirflowException, AirflowRescheduleException, AirflowSensorTimeout
 from airflow.models import TaskReschedule
@@ -158,14 +158,14 @@ class TestBaseSensor:
             if ti.task_id == DUMMY_OP:
                 assert ti.state == State.NONE
 
-    def test_ok_with_reschedule(self, make_sensor):
+    def test_ok_with_reschedule(self, make_sensor, time_machine):
         sensor, dr = make_sensor(return_value=None, poke_interval=10, timeout=25, mode="reschedule")
         sensor.poke = Mock(side_effect=[False, False, True])
 
         # first poke returns False and task is re-scheduled
         date1 = timezone.utcnow()
-        with freeze_time(date1):
-            self._run(sensor)
+        time_machine.move_to(date1, tick=False)
+        self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -183,9 +183,9 @@ class TestBaseSensor:
                 assert ti.state == State.NONE
 
         # second poke returns False and task is re-scheduled
+        time_machine.coordinates.shift(sensor.poke_interval)
         date2 = date1 + timedelta(seconds=sensor.poke_interval)
-        with freeze_time(date2):
-            self._run(sensor)
+        self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -203,9 +203,8 @@ class TestBaseSensor:
                 assert ti.state == State.NONE
 
         # third poke returns True and task succeeds
-        date3 = date2 + timedelta(seconds=sensor.poke_interval)
-        with freeze_time(date3):
-            self._run(sensor)
+        time_machine.coordinates.shift(sensor.poke_interval)
+        self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -216,13 +215,13 @@ class TestBaseSensor:
             if ti.task_id == DUMMY_OP:
                 assert ti.state == State.NONE
 
-    def test_fail_with_reschedule(self, make_sensor):
+    def test_fail_with_reschedule(self, make_sensor, time_machine):
         sensor, dr = make_sensor(return_value=False, poke_interval=10, timeout=5, mode="reschedule")
 
         # first poke returns False and task is re-scheduled
         date1 = timezone.utcnow()
-        with freeze_time(date1):
-            self._run(sensor)
+        time_machine.move_to(date1, tick=False)
+        self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -232,10 +231,9 @@ class TestBaseSensor:
                 assert ti.state == State.NONE
 
         # second poke returns False, timeout occurs
-        date2 = date1 + timedelta(seconds=sensor.poke_interval)
-        with freeze_time(date2):
-            with pytest.raises(AirflowSensorTimeout):
-                self._run(sensor)
+        time_machine.coordinates.shift(sensor.poke_interval)
+        with pytest.raises(AirflowSensorTimeout):
+            self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -244,15 +242,15 @@ class TestBaseSensor:
             if ti.task_id == DUMMY_OP:
                 assert ti.state == State.NONE
 
-    def test_soft_fail_with_reschedule(self, make_sensor):
+    def test_soft_fail_with_reschedule(self, make_sensor, time_machine):
         sensor, dr = make_sensor(
             return_value=False, poke_interval=10, timeout=5, soft_fail=True, mode="reschedule"
         )
 
         # first poke returns False and task is re-scheduled
         date1 = timezone.utcnow()
-        with freeze_time(date1):
-            self._run(sensor)
+        time_machine.move_to(date1, tick=False)
+        self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -262,9 +260,8 @@ class TestBaseSensor:
                 assert ti.state == State.NONE
 
         # second poke returns False, timeout occurs
-        date2 = date1 + timedelta(seconds=sensor.poke_interval)
-        with freeze_time(date2):
-            self._run(sensor)
+        time_machine.coordinates.shift(sensor.poke_interval)
+        self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -273,7 +270,7 @@ class TestBaseSensor:
             if ti.task_id == DUMMY_OP:
                 assert ti.state == State.NONE
 
-    def test_ok_with_reschedule_and_retry(self, make_sensor):
+    def test_ok_with_reschedule_and_retry(self, make_sensor, time_machine):
         sensor, dr = make_sensor(
             return_value=None,
             poke_interval=10,
@@ -286,8 +283,8 @@ class TestBaseSensor:
 
         # first poke returns False and task is re-scheduled
         date1 = timezone.utcnow()
-        with freeze_time(date1):
-            self._run(sensor)
+        time_machine.move_to(date1, tick=False)
+        self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -303,10 +300,9 @@ class TestBaseSensor:
                 assert ti.state == State.NONE
 
         # second poke timesout and task instance is failed
-        date2 = date1 + timedelta(seconds=sensor.poke_interval)
-        with freeze_time(date2):
-            with pytest.raises(AirflowSensorTimeout):
-                self._run(sensor)
+        time_machine.coordinates.shift(sensor.poke_interval)
+        with pytest.raises(AirflowSensorTimeout):
+            self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -319,9 +315,9 @@ class TestBaseSensor:
         sensor.clear()
 
         # third poke returns False and task is rescheduled again
-        date3 = date2 + timedelta(seconds=sensor.poke_interval) + sensor.retry_delay
-        with freeze_time(date3):
-            self._run(sensor)
+        date3 = date1 + timedelta(seconds=sensor.poke_interval) * 2 + sensor.retry_delay
+        time_machine.coordinates.shift(sensor.poke_interval + sensor.retry_delay.total_seconds())
+        self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -337,9 +333,10 @@ class TestBaseSensor:
                 assert ti.state == State.NONE
 
         # fourth poke return True and task succeeds
-        date4 = date3 + timedelta(seconds=sensor.poke_interval)
-        with freeze_time(date4):
-            self._run(sensor)
+
+        time_machine.coordinates.shift(sensor.poke_interval)
+        self._run(sensor)
+
         tis = dr.get_task_instances()
         assert len(tis) == 2
         for ti in tis:
@@ -368,7 +365,7 @@ class TestBaseSensor:
         )
 
         # first poke returns False and task is re-scheduled
-        with freeze_time(date1):
+        with time_machine.travel(date1, tick=False):
             self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
@@ -385,7 +382,7 @@ class TestBaseSensor:
                 assert ti.state == State.NONE
 
         # second poke returns False and task is re-scheduled
-        with freeze_time(date2):
+        with time_machine.travel(date2, tick=False):
             self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
@@ -402,7 +399,7 @@ class TestBaseSensor:
                 assert ti.state == State.NONE
 
         # third poke returns True and task succeeds
-        with freeze_time(date3):
+        with time_machine.travel(date3, tick=False):
             self._run(sensor)
         tis = dr.get_task_instances()
         assert len(tis) == 2
@@ -418,7 +415,7 @@ class TestBaseSensor:
 
         # poke returns False and AirflowRescheduleException is raised
         date1 = timezone.utcnow()
-        with freeze_time(date1):
+        with time_machine.travel(date1, tick=False):
             self._run(sensor, test_mode=True)
         tis = dr.get_task_instances()
         assert len(tis) == 2
@@ -545,7 +542,7 @@ class TestBaseSensor:
         sensor, _ = make_sensor(poke_interval=60 * 60 * 24, mode="reschedule", return_value=False)
 
         # A few hours until TIMESTAMP's limit, the next poke will take us over.
-        with freeze_time(datetime(2038, 1, 19, tzinfo=timezone.utc)):
+        with time_machine.travel(datetime(2038, 1, 19, tzinfo=timezone.utc), tick=False):
             with pytest.raises(AirflowSensorTimeout) as ctx:
                 self._run(sensor)
         assert str(ctx.value) == (
@@ -553,7 +550,7 @@ class TestBaseSensor:
             "since it is over MySQL's TIMESTAMP storage limit."
         )
 
-    def test_reschedule_and_retry_timeout(self, make_sensor):
+    def test_reschedule_and_retry_timeout(self, make_sensor, time_machine):
         """
         Test mode="reschedule", retries and timeout configurations interact correctly.
 
@@ -606,42 +603,40 @@ class TestBaseSensor:
 
         # first poke returns False and task is re-scheduled
         date1 = timezone.utcnow()
-        with freeze_time(date1):
-            self._run(sensor)
+        time_machine.move_to(date1, tick=False)
+        self._run(sensor)
         assert_ti_state(1, 2, State.UP_FOR_RESCHEDULE)
 
         # second poke raises RuntimeError and task instance retries
-        date2 = date1 + timedelta(seconds=sensor.poke_interval)
-        with freeze_time(date2), pytest.raises(RuntimeError):
+        time_machine.coordinates.shift(sensor.poke_interval)
+        with pytest.raises(RuntimeError):
             self._run(sensor)
         assert_ti_state(2, 2, State.UP_FOR_RETRY)
 
         # third poke returns False and task is rescheduled again
-        date3 = date2 + sensor.retry_delay + timedelta(seconds=1)
-        with freeze_time(date3):
-            self._run(sensor)
+        time_machine.coordinates.shift(sensor.retry_delay + timedelta(seconds=1))
+        self._run(sensor)
         assert_ti_state(2, 2, State.UP_FOR_RESCHEDULE)
 
         # fourth poke times out and raises AirflowSensorTimeout
-        date4 = date3 + timedelta(seconds=sensor.poke_interval)
-        with freeze_time(date4), pytest.raises(AirflowSensorTimeout):
+        time_machine.coordinates.shift(sensor.poke_interval)
+        with pytest.raises(AirflowSensorTimeout):
             self._run(sensor)
         assert_ti_state(3, 2, State.FAILED)
 
         # Clear the failed sensor
         sensor.clear()
 
-        date_i = date4 + timedelta(seconds=20)
+        time_machine.coordinates.shift(20)
 
         for _ in range(3):
-            date_i += timedelta(seconds=sensor.poke_interval)
-            with freeze_time(date_i):
-                self._run(sensor)
+            time_machine.coordinates.shift(sensor.poke_interval)
+            self._run(sensor)
             assert_ti_state(3, 4, State.UP_FOR_RESCHEDULE)
 
         # Last poke times out and raises AirflowSensorTimeout
-        date8 = date_i + timedelta(seconds=sensor.poke_interval)
-        with freeze_time(date8), pytest.raises(AirflowSensorTimeout):
+        time_machine.coordinates.shift(sensor.poke_interval)
+        with pytest.raises(AirflowSensorTimeout):
             self._run(sensor)
         assert_ti_state(4, 4, State.FAILED)
 
diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py
index 70b875311c..2ccfdd2c42 100644
--- a/tests/sensors/test_time_sensor.py
+++ b/tests/sensors/test_time_sensor.py
@@ -20,9 +20,9 @@ from __future__ import annotations
 from datetime import datetime, time
 from unittest.mock import patch
 
-import freezegun
 import pendulum
 import pytest
+import time_machine
 
 from airflow.exceptions import TaskDeferred
 from airflow.models.dag import DAG
@@ -35,10 +35,6 @@ DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1)
 DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE))
 
 
-@patch(
-    "airflow.sensors.time_sensor.timezone.utcnow",
-    return_value=timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc),
-)
 class TestTimeSensor:
     @pytest.mark.parametrize(
         "default_timezone, start_date, expected",
@@ -48,7 +44,8 @@ class TestTimeSensor:
             (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False),
         ],
     )
-    def test_timezone(self, mock_utcnow, default_timezone, start_date, expected):
+    @time_machine.travel(timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc))
+    def test_timezone(self, default_timezone, start_date, expected):
         with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)):
             dag = DAG("test", default_args={"start_date": start_date})
             op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
@@ -56,7 +53,7 @@ class TestTimeSensor:
 
 
 class TestTimeSensorAsync:
-    @freezegun.freeze_time("2020-07-07 00:00:00")
+    @time_machine.travel("2020-07-07 00:00:00", tick=False)
     def test_task_is_deferred(self):
         with DAG("test_task_is_deferred", start_date=timezone.datetime(2020, 1, 1, 23, 0)):
             op = TimeSensorAsync(task_id="test", target_time=time(10, 0))
diff --git a/tests/ti_deps/deps/test_not_in_retry_period_dep.py b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
index 107376d2a3..2abf42273a 100644
--- a/tests/ti_deps/deps/test_not_in_retry_period_dep.py
+++ b/tests/ti_deps/deps/test_not_in_retry_period_dep.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 from datetime import timedelta
 from unittest.mock import Mock
 
-from freezegun import freeze_time
+import time_machine
 
 from airflow.models import TaskInstance
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
@@ -35,7 +35,7 @@ class TestNotInRetryPeriodDep:
         ti.end_date = end_date
         return ti
 
-    @freeze_time("2016-01-01 15:44")
+    @time_machine.travel("2016-01-01 15:44")
     def test_still_in_retry_period(self):
         """
         Task instances that are in their retry period should fail this dep
@@ -44,7 +44,7 @@ class TestNotInRetryPeriodDep:
         assert ti.is_premature
         assert not NotInRetryPeriodDep().is_met(ti=ti)
 
-    @freeze_time("2016-01-01 15:46")
+    @time_machine.travel("2016-01-01 15:46")
     def test_retry_period_finished(self):
         """
         Task instance's that have had their retry period elapse should pass this dep
diff --git a/tests/ti_deps/deps/test_runnable_exec_date_dep.py b/tests/ti_deps/deps/test_runnable_exec_date_dep.py
index eebde0cd42..4df559e489 100644
--- a/tests/ti_deps/deps/test_runnable_exec_date_dep.py
+++ b/tests/ti_deps/deps/test_runnable_exec_date_dep.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 from unittest.mock import Mock, patch
 
 import pytest
-from freezegun import freeze_time
+import time_machine
 
 from airflow import settings
 from airflow.models import DagRun, TaskInstance
@@ -36,7 +36,7 @@ def clean_db(session):
     session.query(TaskInstance).delete()
 
 
-@freeze_time("2016-11-01")
+@time_machine.travel("2016-11-01")
 @pytest.mark.parametrize(
     "allow_trigger_in_future,schedule_interval,execution_date,is_met",
     [
@@ -74,7 +74,7 @@ def test_exec_date_dep(
         assert RunnableExecDateDep().is_met(ti=ti) == is_met
 
 
-@freeze_time("2016-01-01")
+@time_machine.travel("2016-01-01")
 def test_exec_date_after_end_date(session, dag_maker, create_dummy_dag):
     """
     If the dag's execution date is in the future this dep should fail
diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py
index 3c324672a7..596c274cf7 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -20,9 +20,9 @@ from __future__ import annotations
 import datetime
 
 import dateutil.relativedelta
-import freezegun
 import pendulum
 import pytest
+import time_machine
 
 from airflow.exceptions import AirflowTimetableInvalid
 from airflow.settings import TIMEZONE
@@ -52,7 +52,7 @@ DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
     "last_automated_data_interval",
     [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")],
 )
-@freezegun.freeze_time(CURRENT_TIME)
+@time_machine.travel(CURRENT_TIME)
 def test_no_catchup_first_starts_at_current_time(
     last_automated_data_interval: DataInterval | None,
 ) -> None:
@@ -73,7 +73,7 @@ def test_no_catchup_first_starts_at_current_time(
     "catchup",
     [pytest.param(True, id="catchup_true"), pytest.param(False, id="catchup_false")],
 )
-@freezegun.freeze_time(CURRENT_TIME)
+@time_machine.travel(CURRENT_TIME)
 def test_new_schedule_interval_next_info_starts_at_new_time(
     earliest: pendulum.DateTime | None,
     catchup: bool,
@@ -100,7 +100,7 @@ def test_new_schedule_interval_next_info_starts_at_new_time(
     "last_automated_data_interval",
     [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")],
 )
-@freezegun.freeze_time(CURRENT_TIME)
+@time_machine.travel(CURRENT_TIME)
 def test_no_catchup_next_info_starts_at_current_time(
     timetable: Timetable,
     last_automated_data_interval: DataInterval | None,
diff --git a/tests/timetables/test_trigger_timetable.py b/tests/timetables/test_trigger_timetable.py
index cabb1198ef..6f1d44479f 100644
--- a/tests/timetables/test_trigger_timetable.py
+++ b/tests/timetables/test_trigger_timetable.py
@@ -20,10 +20,10 @@ import datetime
 import typing
 
 import dateutil.relativedelta
-import freezegun
 import pendulum
 import pendulum.tz
 import pytest
+import time_machine
 
 from airflow.exceptions import AirflowTimetableInvalid
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
@@ -63,7 +63,7 @@ DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
         ),
     ],
 )
-@freezegun.freeze_time(CURRENT_TIME)
+@time_machine.travel(CURRENT_TIME)
 def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule(
     last_automated_data_interval: DataInterval | None,
     next_start_time: pendulum.DateTime,
@@ -105,7 +105,7 @@ def test_hourly_cron_trigger_no_catchup_next_info(
     earliest: pendulum.DateTime,
     expected: DagRunInfo,
 ) -> None:
-    with freezegun.freeze_time(current_time):
+    with time_machine.travel(current_time):
         next_info = HOURLY_CRON_TRIGGER_TIMETABLE.next_dagrun_info(
             last_automated_data_interval=PREV_DATA_INTERVAL_EXACT,
             restriction=TimeRestriction(earliest=earliest, latest=None, catchup=False),
diff --git a/tests/utils/log/test_file_processor_handler.py b/tests/utils/log/test_file_processor_handler.py
index 1d479c71da..e11ff35b0c 100644
--- a/tests/utils/log/test_file_processor_handler.py
+++ b/tests/utils/log/test_file_processor_handler.py
@@ -21,7 +21,7 @@ import os
 import shutil
 from datetime import timedelta
 
-from freezegun import freeze_time
+import time_machine
 
 from airflow.utils import timezone
 from airflow.utils.log.file_processor_handler import FileProcessorHandler
@@ -77,13 +77,13 @@ class TestFileProcessorHandler:
 
         link = os.path.join(self.base_log_folder, "latest")
 
-        with freeze_time(date1):
+        with time_machine.travel(date1, tick=False):
             handler.set_context(filename=os.path.join(self.dag_dir, "log1"))
             assert os.path.islink(link)
             assert os.path.basename(os.readlink(link)) == date1
             assert os.path.exists(os.path.join(link, "log1"))
 
-        with freeze_time(date2):
+        with time_machine.travel(date2, tick=False):
             handler.set_context(filename=os.path.join(self.dag_dir, "log2"))
             assert os.path.islink(link)
             assert os.path.basename(os.readlink(link)) == date2
@@ -104,7 +104,7 @@ class TestFileProcessorHandler:
             os.remove(link)
         os.makedirs(link)
 
-        with freeze_time(date1):
+        with time_machine.travel(date1, tick=False):
             handler.set_context(filename=os.path.join(self.dag_dir, "log1"))
 
     def teardown_method(self):
diff --git a/tests/utils/test_serve_logs.py b/tests/utils/test_serve_logs.py
index e306c50e2b..5288e646ed 100644
--- a/tests/utils/test_serve_logs.py
+++ b/tests/utils/test_serve_logs.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING
 
 import jwt
 import pytest
-from freezegun import freeze_time
+import time_machine
 
 from airflow.utils.jwt_signer import JWTSigner
 from airflow.utils.serve_logs import create_app
@@ -92,7 +92,7 @@ class TestServeLogs:
         assert response.status_code == 403
 
     def test_forbidden_expired(self, client: FlaskClient, signer):
-        with freeze_time("2010-01-14"):
+        with time_machine.travel("2010-01-14"):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
@@ -105,7 +105,7 @@ class TestServeLogs:
         )
 
     def test_forbidden_future(self, client: FlaskClient, signer):
-        with freeze_time(datetime.datetime.utcnow() + datetime.timedelta(seconds=3600)):
+        with time_machine.travel(datetime.datetime.utcnow() + datetime.timedelta(seconds=3600)):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
@@ -118,7 +118,7 @@ class TestServeLogs:
         )
 
     def test_ok_with_short_future_skew(self, client: FlaskClient, signer):
-        with freeze_time(datetime.datetime.utcnow() + datetime.timedelta(seconds=1)):
+        with time_machine.travel(datetime.datetime.utcnow() + datetime.timedelta(seconds=1)):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
@@ -131,7 +131,7 @@ class TestServeLogs:
         )
 
     def test_ok_with_short_past_skew(self, client: FlaskClient, signer):
-        with freeze_time(datetime.datetime.utcnow() - datetime.timedelta(seconds=31)):
+        with time_machine.travel(datetime.datetime.utcnow() - datetime.timedelta(seconds=31)):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
@@ -144,7 +144,7 @@ class TestServeLogs:
         )
 
     def test_forbidden_with_long_future_skew(self, client: FlaskClient, signer):
-        with freeze_time(datetime.datetime.utcnow() + datetime.timedelta(seconds=10)):
+        with time_machine.travel(datetime.datetime.utcnow() + datetime.timedelta(seconds=10)):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
@@ -157,7 +157,7 @@ class TestServeLogs:
         )
 
     def test_forbidden_with_long_past_skew(self, client: FlaskClient, signer):
-        with freeze_time(datetime.datetime.utcnow() - datetime.timedelta(seconds=40)):
+        with time_machine.travel(datetime.datetime.utcnow() - datetime.timedelta(seconds=40)):
             token = signer.generate_signed_token({"filename": "sample.log"})
         assert (
             client.get(
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index b34842a92d..31e6ca3b46 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -23,9 +23,9 @@ import logging
 from unittest import mock
 
 import pytest
+import time_machine
 from flask_appbuilder import SQLA, Model, expose, has_access
 from flask_appbuilder.views import BaseView, ModelView
-from freezegun import freeze_time
 from sqlalchemy import Column, Date, Float, Integer, String
 
 from airflow.exceptions import AirflowException
@@ -904,7 +904,7 @@ def old_user():
     return user
 
 
-@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0))  # Get the Delorean, doc!
+@time_machine.travel(datetime.datetime(1985, 11, 5, 1, 24, 0), tick=False)
 def test_update_user_auth_stat_first_successful_auth(mock_security_manager, new_user):
     mock_security_manager.update_user_auth_stat(new_user, success=True)
 
@@ -914,7 +914,7 @@ def test_update_user_auth_stat_first_successful_auth(mock_security_manager, new_
     assert mock_security_manager.update_user.called_once
 
 
-@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0))
+@time_machine.travel(datetime.datetime(1985, 11, 5, 1, 24, 0), tick=False)
 def test_update_user_auth_stat_subsequent_successful_auth(mock_security_manager, old_user):
     mock_security_manager.update_user_auth_stat(old_user, success=True)
 
@@ -924,7 +924,7 @@ def test_update_user_auth_stat_subsequent_successful_auth(mock_security_manager,
     assert mock_security_manager.update_user.called_once
 
 
-@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0))
+@time_machine.travel(datetime.datetime(1985, 11, 5, 1, 24, 0), tick=False)
 def test_update_user_auth_stat_first_unsuccessful_auth(mock_security_manager, new_user):
     mock_security_manager.update_user_auth_stat(new_user, success=False)
 
@@ -934,7 +934,7 @@ def test_update_user_auth_stat_first_unsuccessful_auth(mock_security_manager, ne
     assert mock_security_manager.update_user.called_once
 
 
-@freeze_time(datetime.datetime(1985, 11, 5, 1, 24, 0))
+@time_machine.travel(datetime.datetime(1985, 11, 5, 1, 24, 0), tick=False)
 def test_update_user_auth_stat_subsequent_unsuccessful_auth(mock_security_manager, old_user):
     mock_security_manager.update_user_auth_stat(old_user, success=False)
 
diff --git a/tests/www/views/test_views_grid.py b/tests/www/views/test_views_grid.py
index 40dd1ce917..bc17ddc5de 100644
--- a/tests/www/views/test_views_grid.py
+++ b/tests/www/views/test_views_grid.py
@@ -17,8 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import datetime, timedelta
-
 import pendulum
 import pytest
 from dateutil.tz import UTC
@@ -29,6 +27,7 @@ from airflow.models import DagBag
 from airflow.models.dagrun import DagRun
 from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel
 from airflow.operators.empty import EmptyOperator
+from airflow.utils import timezone
 from airflow.utils.state import DagRunState, TaskInstanceState
 from airflow.utils.task_group import TaskGroup
 from airflow.utils.types import DagRunType
@@ -129,6 +128,14 @@ def test_no_runs(admin_client, dag_without_runs):
     }
 
 
+# Create this as a fixture so that it is applied before the `dag_with_runs` fixture is!
+@pytest.fixture
+def freeze_time_for_dagruns(time_machine):
+    time_machine.move_to("2022-01-02T00:00:00+00:00", tick=False)
+    yield
+
+
+@pytest.mark.usefixtures("freeze_time_for_dagruns")
 def test_one_run(admin_client, dag_with_runs: list[DagRun], session):
     """
     Test a DAG with complex interaction of states:
@@ -159,22 +166,14 @@ def test_one_run(admin_client, dag_with_runs: list[DagRun], session):
 
     assert resp.status_code == 200, resp.json
 
-    # We cannot use freezegun here as it does not play well with Flask 2.2 and SqlAlchemy
-    # Unlike real datetime, when FakeDatetime is used, it coerces to
-    # '2020-08-06 09:00:00+00:00' which is rejected by MySQL for EXPIRY Column
-    current_date_placeholder = "2022-01-02T00:00:00+00:00"
-    actual_date_in_json = datetime.fromisoformat(resp.json["dag_runs"][0]["end_date"])
-    assert datetime.now(tz=UTC) - actual_date_in_json < timedelta(minutes=5)
-    res = resp.json
-    res["dag_runs"][0]["end_date"] = current_date_placeholder
-    assert res == {
+    assert resp.json == {
         "dag_runs": [
             {
                 "conf": None,
                 "conf_is_json": False,
                 "data_interval_end": "2016-01-02T00:00:00+00:00",
                 "data_interval_start": "2016-01-01T00:00:00+00:00",
-                "end_date": current_date_placeholder,
+                "end_date": timezone.utcnow().isoformat(),
                 "execution_date": "2016-01-01T00:00:00+00:00",
                 "external_trigger": False,
                 "last_scheduling_decision": None,
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index ab3b40f0b2..1543db30f6 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -22,10 +22,9 @@ import json
 import re
 import unittest.mock
 import urllib.parse
-from datetime import timedelta
 
-import freezegun
 import pytest
+import time_machine
 
 from airflow import settings
 from airflow.exceptions import AirflowException
@@ -61,7 +60,7 @@ def reset_dagruns():
 
 @pytest.fixture(autouse=True)
 def init_dagruns(app, reset_dagruns):
-    with freezegun.freeze_time(DEFAULT_DATE):
+    with time_machine.travel(DEFAULT_DATE, tick=False):
         app.dag_bag.get_dag("example_bash_operator").create_dagrun(
             run_id=DEFAULT_DAGRUN,
             run_type=DagRunType.SCHEDULED,
@@ -563,7 +562,7 @@ def test_run_with_runnable_states(_, admin_client, session, state):
     "airflow.executors.executor_loader.ExecutorLoader.get_default_executor",
     return_value=_ForceHeartbeatCeleryExecutor(),
 )
-def test_run_ignoring_deps_sets_queued_dttm(_, admin_client, session):
+def test_run_ignoring_deps_sets_queued_dttm(_, admin_client, session, time_machine):
     task_id = "runme_0"
     session.query(TaskInstance).filter(TaskInstance.task_id == task_id).update(
         {"state": State.SCHEDULED, "queued_dttm": None}
@@ -579,15 +578,13 @@ def test_run_ignoring_deps_sets_queued_dttm(_, admin_client, session):
         dag_run_id=DEFAULT_DAGRUN,
         origin="/home",
     )
+    now = timezone.utcnow()
+
+    time_machine.move_to(now, tick=False)
     resp = admin_client.post("run", data=form, follow_redirects=True)
 
     assert resp.status_code == 200
-    # We cannot use freezegun here as it does not play well with Flask 2.2 and SqlAlchemy
-    # Unlike real datetime, when FakeDatetime is used, it coerces to
-    # '2020-08-06 09:00:00+00:00' which is rejected by MySQL for EXPIRY Column
-    assert timezone.utcnow() - session.query(TaskInstance.queued_dttm).filter(
-        TaskInstance.task_id == task_id
-    ).scalar() < timedelta(minutes=5)
+    assert session.query(TaskInstance.queued_dttm).filter(TaskInstance.task_id == task_id).scalar() == now
 
 
 @pytest.mark.parametrize("state", QUEUEABLE_STATES)