You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/08/25 20:10:57 UTC

[airflow] branch main updated: Refactor: tmp_path in tests/dag_processing (#33740)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new eb93f67b2b Refactor: tmp_path in tests/dag_processing (#33740)
eb93f67b2b is described below

commit eb93f67b2b2478dfe413778cbd4673c95b4ca773
Author: Miroslav Šedivý <67...@users.noreply.github.com>
AuthorDate: Fri Aug 25 20:10:45 2023 +0000

    Refactor: tmp_path in tests/dag_processing (#33740)
---
 tests/dag_processing/test_job_runner.py | 106 +++++++++++++++-----------------
 1 file changed, 51 insertions(+), 55 deletions(-)

diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py
index e300eb9866..8189c9a219 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -29,7 +29,6 @@ import threading
 import time
 from datetime import datetime, timedelta
 from logging.config import dictConfig
-from tempfile import TemporaryDirectory
 from textwrap import dedent
 from unittest import mock
 from unittest.mock import MagicMock, PropertyMock
@@ -145,12 +144,11 @@ class TestDagProcessorJobRunner:
             raise RuntimeError("Shouldn't get here - nothing to read, but manager not finished!")
 
     @conf_vars({("core", "load_examples"): "False"})
-    def test_remove_file_clears_import_error(self, tmpdir):
-        filename_to_parse = tmpdir / "temp_dag.py"
+    def test_remove_file_clears_import_error(self, tmp_path):
+        path_to_parse = tmp_path / "temp_dag.py"
 
         # Generate original import error
-        with open(filename_to_parse, "w") as file_to_parse:
-            file_to_parse.writelines("an invalid airflow DAG")
+        path_to_parse.write_text("an invalid airflow DAG")
 
         child_pipe, parent_pipe = multiprocessing.Pipe()
 
@@ -158,7 +156,7 @@ class TestDagProcessorJobRunner:
         manager = DagProcessorJobRunner(
             job=Job(),
             processor=DagFileProcessorManager(
-                dag_directory=tmpdir,
+                dag_directory=path_to_parse.parent,
                 max_runs=1,
                 processor_timeout=timedelta(days=365),
                 signal_conn=child_pipe,
@@ -174,7 +172,7 @@ class TestDagProcessorJobRunner:
             import_errors = session.query(errors.ImportError).all()
             assert len(import_errors) == 1
 
-            filename_to_parse.remove()
+            path_to_parse.unlink()
 
             # Rerun the scheduler once the dag file has been removed
             self.run_processor_manager_one_loop(manager, parent_pipe)
@@ -187,25 +185,24 @@ class TestDagProcessorJobRunner:
         parent_pipe.close()
 
     @conf_vars({("core", "load_examples"): "False"})
-    def test_max_runs_when_no_files(self):
+    def test_max_runs_when_no_files(self, tmp_path):
         child_pipe, parent_pipe = multiprocessing.Pipe()
 
-        with TemporaryDirectory(prefix="empty-airflow-dags-") as dags_folder:
-            async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
-            manager = DagProcessorJobRunner(
-                job=Job(),
-                processor=DagFileProcessorManager(
-                    dag_directory=dags_folder,
-                    max_runs=1,
-                    processor_timeout=timedelta(days=365),
-                    signal_conn=child_pipe,
-                    dag_ids=[],
-                    pickle_dags=False,
-                    async_mode=async_mode,
-                ),
-            )
+        async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
+        manager = DagProcessorJobRunner(
+            job=Job(),
+            processor=DagFileProcessorManager(
+                dag_directory=os.fspath(tmp_path),
+                max_runs=1,
+                processor_timeout=timedelta(days=365),
+                signal_conn=child_pipe,
+                dag_ids=[],
+                pickle_dags=False,
+                async_mode=async_mode,
+            ),
+        )
 
-            self.run_processor_manager_one_loop(manager, parent_pipe)
+        self.run_processor_manager_one_loop(manager, parent_pipe)
         child_pipe.close()
         parent_pipe.close()
 
@@ -893,16 +890,15 @@ class TestDagProcessorJobRunner:
 
     @conf_vars({("core", "load_examples"): "False"})
     @mock.patch("airflow.dag_processing.manager.Stats.timing")
-    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
-        filename_to_parse = tmpdir / "temp_dag.py"
+    def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmp_path):
+        path_to_parse = tmp_path / "temp_dag.py"
         dag_code = dedent(
             """
         from airflow import DAG
         dag = DAG(dag_id='temp_dag', schedule='0 0 * * *')
         """
         )
-        with open(filename_to_parse, "w") as file_to_parse:
-            file_to_parse.writelines(dag_code)
+        path_to_parse.write_text(dag_code)
 
         child_pipe, parent_pipe = multiprocessing.Pipe()
 
@@ -910,7 +906,7 @@ class TestDagProcessorJobRunner:
         manager = DagProcessorJobRunner(
             job=Job(),
             processor=DagFileProcessorManager(
-                dag_directory=tmpdir,
+                dag_directory=path_to_parse.parent,
                 max_runs=1,
                 processor_timeout=timedelta(days=365),
                 signal_conn=child_pipe,
@@ -938,7 +934,7 @@ class TestDagProcessorJobRunner:
             any_order=True,
         )
 
-    def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
+    def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path):
         """Test DagProcessorJobRunner._refresh_dag_dir method"""
         manager = DagProcessorJobRunner(
             job=Job(),
@@ -952,7 +948,7 @@ class TestDagProcessorJobRunner:
                 async_mode=True,
             ),
         )
-        dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
+        dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
         zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
         dagbag.process_file(zipped_dag_path)
         dag = dagbag.get_dag("test_zip_dag")
@@ -967,7 +963,7 @@ class TestDagProcessorJobRunner:
         # assert dag still active
         assert dag.get_is_active()
 
-    def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir):
+    def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path):
         """Test DagProcessorJobRunner._refresh_dag_dir method"""
         manager = DagProcessorJobRunner(
             job=Job(),
@@ -981,7 +977,7 @@ class TestDagProcessorJobRunner:
                 async_mode=True,
             ),
         )
-        dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
+        dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
         zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
         dagbag.process_file(zipped_dag_path)
         dag = dagbag.get_dag("test_zip_dag")
@@ -1000,10 +996,10 @@ class TestDagProcessorJobRunner:
         # assert dag deactivated
         assert not dag.get_is_active()
 
-    def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, tmpdir):
+    def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, tmp_path):
         """Test DagProcessorJobRunner._refresh_dag_dir should not update dags outside its processor_subdir"""
 
-        dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
+        dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
         dag_path = os.path.join(TEST_DAGS_FOLDER, "test_miscellaneous.py")
         dagbag.process_file(dag_path)
         dag = dagbag.get_dag("miscellaneous_test_dag")
@@ -1040,7 +1036,7 @@ class TestDagProcessorJobRunner:
             ("scheduler", "standalone_dag_processor"): "True",
         }
     )
-    def test_fetch_callbacks_from_database(self, tmpdir):
+    def test_fetch_callbacks_from_database(self, tmp_path):
         """Test DagProcessorJobRunner._fetch_callbacks method"""
         dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
 
@@ -1048,20 +1044,20 @@ class TestDagProcessorJobRunner:
             dag_id="test_start_date_scheduling",
             full_filepath=str(dag_filepath),
             is_failure_callback=True,
-            processor_subdir=str(tmpdir),
+            processor_subdir=os.fspath(tmp_path),
             run_id="123",
         )
         callback2 = DagCallbackRequest(
             dag_id="test_start_date_scheduling",
             full_filepath=str(dag_filepath),
             is_failure_callback=True,
-            processor_subdir=str(tmpdir),
+            processor_subdir=os.fspath(tmp_path),
             run_id="456",
         )
         callback3 = SlaCallbackRequest(
             dag_id="test_start_date_scheduling",
             full_filepath=str(dag_filepath),
-            processor_subdir=str(tmpdir),
+            processor_subdir=os.fspath(tmp_path),
         )
 
         with create_session() as session:
@@ -1073,7 +1069,7 @@ class TestDagProcessorJobRunner:
         manager = DagProcessorJobRunner(
             job=Job(),
             processor=DagFileProcessorManager(
-                dag_directory=str(tmpdir),
+                dag_directory=os.fspath(tmp_path),
                 max_runs=1,
                 processor_timeout=timedelta(days=365),
                 signal_conn=child_pipe,
@@ -1093,7 +1089,7 @@ class TestDagProcessorJobRunner:
             ("scheduler", "standalone_dag_processor"): "True",
         }
     )
-    def test_fetch_callbacks_for_current_dag_directory_only(self, tmpdir):
+    def test_fetch_callbacks_for_current_dag_directory_only(self, tmp_path):
         """Test DagProcessorJobRunner._fetch_callbacks method"""
         dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
 
@@ -1101,7 +1097,7 @@ class TestDagProcessorJobRunner:
             dag_id="test_start_date_scheduling",
             full_filepath=str(dag_filepath),
             is_failure_callback=True,
-            processor_subdir=str(tmpdir),
+            processor_subdir=os.fspath(tmp_path),
             run_id="123",
         )
         callback2 = DagCallbackRequest(
@@ -1120,7 +1116,7 @@ class TestDagProcessorJobRunner:
         manager = DagProcessorJobRunner(
             job=Job(),
             processor=DagFileProcessorManager(
-                dag_directory=tmpdir,
+                dag_directory=tmp_path,
                 max_runs=1,
                 processor_timeout=timedelta(days=365),
                 signal_conn=child_pipe,
@@ -1141,7 +1137,7 @@ class TestDagProcessorJobRunner:
             ("core", "load_examples"): "False",
         }
     )
-    def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir):
+    def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path):
         """Test DagProcessorJobRunner._fetch_callbacks method"""
         dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
 
@@ -1152,7 +1148,7 @@ class TestDagProcessorJobRunner:
                     full_filepath=str(dag_filepath),
                     is_failure_callback=True,
                     run_id=str(i),
-                    processor_subdir=str(tmpdir),
+                    processor_subdir=os.fspath(tmp_path),
                 )
                 session.add(DbCallbackRequest(callback=callback, priority_weight=i))
 
@@ -1160,7 +1156,7 @@ class TestDagProcessorJobRunner:
         manager = DagProcessorJobRunner(
             job=Job(),
             processor=DagFileProcessorManager(
-                dag_directory=str(tmpdir),
+                dag_directory=str(tmp_path),
                 max_runs=1,
                 processor_timeout=timedelta(days=365),
                 signal_conn=child_pipe,
@@ -1184,7 +1180,7 @@ class TestDagProcessorJobRunner:
             ("core", "load_examples"): "False",
         }
     )
-    def test_fetch_callbacks_from_database_not_standalone(self, tmpdir):
+    def test_fetch_callbacks_from_database_not_standalone(self, tmp_path):
         dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
 
         with create_session() as session:
@@ -1192,7 +1188,7 @@ class TestDagProcessorJobRunner:
                 dag_id="test_start_date_scheduling",
                 full_filepath=str(dag_filepath),
                 is_failure_callback=True,
-                processor_subdir=str(tmpdir),
+                processor_subdir=str(tmp_path),
                 run_id="123",
             )
             session.add(DbCallbackRequest(callback=callback, priority_weight=10))
@@ -1201,7 +1197,7 @@ class TestDagProcessorJobRunner:
         manager = DagProcessorJobRunner(
             job=Job(),
             processor=DagFileProcessorManager(
-                dag_directory=tmpdir,
+                dag_directory=tmp_path,
                 max_runs=1,
                 processor_timeout=timedelta(days=365),
                 signal_conn=child_pipe,
@@ -1219,7 +1215,7 @@ class TestDagProcessorJobRunner:
         with create_session() as session:
             assert session.query(DbCallbackRequest).count() == 1
 
-    def test_callback_queue(self, tmpdir):
+    def test_callback_queue(self, tmp_path):
         # given
         manager = DagProcessorJobRunner(
             job=Job(),
@@ -1239,7 +1235,7 @@ class TestDagProcessorJobRunner:
             dag_id="dag1",
             run_id="run1",
             is_failure_callback=False,
-            processor_subdir=tmpdir,
+            processor_subdir=tmp_path,
             msg=None,
         )
         dag1_req2 = DagCallbackRequest(
@@ -1247,18 +1243,18 @@ class TestDagProcessorJobRunner:
             dag_id="dag1",
             run_id="run1",
             is_failure_callback=False,
-            processor_subdir=tmpdir,
+            processor_subdir=tmp_path,
             msg=None,
         )
         dag1_sla1 = SlaCallbackRequest(
             full_filepath="/green_eggs/ham/file1.py",
             dag_id="dag1",
-            processor_subdir=tmpdir,
+            processor_subdir=tmp_path,
         )
         dag1_sla2 = SlaCallbackRequest(
             full_filepath="/green_eggs/ham/file1.py",
             dag_id="dag1",
-            processor_subdir=tmpdir,
+            processor_subdir=tmp_path,
         )
 
         dag2_req1 = DagCallbackRequest(
@@ -1266,14 +1262,14 @@ class TestDagProcessorJobRunner:
             dag_id="dag2",
             run_id="run1",
             is_failure_callback=False,
-            processor_subdir=tmpdir,
+            processor_subdir=tmp_path,
             msg=None,
         )
 
         dag3_sla1 = SlaCallbackRequest(
             full_filepath="/green_eggs/ham/file3.py",
             dag_id="dag3",
-            processor_subdir=tmpdir,
+            processor_subdir=tmp_path,
         )
 
         # when