You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/08/25 20:10:57 UTC
[airflow] branch main updated: Refactor: tmp_path in tests/dag_processing (#33740)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eb93f67b2b Refactor: tmp_path in tests/dag_processing (#33740)
eb93f67b2b is described below
commit eb93f67b2b2478dfe413778cbd4673c95b4ca773
Author: Miroslav Šedivý <67...@users.noreply.github.com>
AuthorDate: Fri Aug 25 20:10:45 2023 +0000
Refactor: tmp_path in tests/dag_processing (#33740)
---
tests/dag_processing/test_job_runner.py | 106 +++++++++++++++-----------------
1 file changed, 51 insertions(+), 55 deletions(-)
diff --git a/tests/dag_processing/test_job_runner.py b/tests/dag_processing/test_job_runner.py
index e300eb9866..8189c9a219 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -29,7 +29,6 @@ import threading
import time
from datetime import datetime, timedelta
from logging.config import dictConfig
-from tempfile import TemporaryDirectory
from textwrap import dedent
from unittest import mock
from unittest.mock import MagicMock, PropertyMock
@@ -145,12 +144,11 @@ class TestDagProcessorJobRunner:
raise RuntimeError("Shouldn't get here - nothing to read, but manager not finished!")
@conf_vars({("core", "load_examples"): "False"})
- def test_remove_file_clears_import_error(self, tmpdir):
- filename_to_parse = tmpdir / "temp_dag.py"
+ def test_remove_file_clears_import_error(self, tmp_path):
+ path_to_parse = tmp_path / "temp_dag.py"
# Generate original import error
- with open(filename_to_parse, "w") as file_to_parse:
- file_to_parse.writelines("an invalid airflow DAG")
+ path_to_parse.write_text("an invalid airflow DAG")
child_pipe, parent_pipe = multiprocessing.Pipe()
@@ -158,7 +156,7 @@ class TestDagProcessorJobRunner:
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
- dag_directory=tmpdir,
+ dag_directory=path_to_parse.parent,
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
@@ -174,7 +172,7 @@ class TestDagProcessorJobRunner:
import_errors = session.query(errors.ImportError).all()
assert len(import_errors) == 1
- filename_to_parse.remove()
+ path_to_parse.unlink()
# Rerun the scheduler once the dag file has been removed
self.run_processor_manager_one_loop(manager, parent_pipe)
@@ -187,25 +185,24 @@ class TestDagProcessorJobRunner:
parent_pipe.close()
@conf_vars({("core", "load_examples"): "False"})
- def test_max_runs_when_no_files(self):
+ def test_max_runs_when_no_files(self, tmp_path):
child_pipe, parent_pipe = multiprocessing.Pipe()
- with TemporaryDirectory(prefix="empty-airflow-dags-") as dags_folder:
- async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
- manager = DagProcessorJobRunner(
- job=Job(),
- processor=DagFileProcessorManager(
- dag_directory=dags_folder,
- max_runs=1,
- processor_timeout=timedelta(days=365),
- signal_conn=child_pipe,
- dag_ids=[],
- pickle_dags=False,
- async_mode=async_mode,
- ),
- )
+ async_mode = "sqlite" not in conf.get("database", "sql_alchemy_conn")
+ manager = DagProcessorJobRunner(
+ job=Job(),
+ processor=DagFileProcessorManager(
+ dag_directory=os.fspath(tmp_path),
+ max_runs=1,
+ processor_timeout=timedelta(days=365),
+ signal_conn=child_pipe,
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=async_mode,
+ ),
+ )
- self.run_processor_manager_one_loop(manager, parent_pipe)
+ self.run_processor_manager_one_loop(manager, parent_pipe)
child_pipe.close()
parent_pipe.close()
@@ -893,16 +890,15 @@ class TestDagProcessorJobRunner:
@conf_vars({("core", "load_examples"): "False"})
@mock.patch("airflow.dag_processing.manager.Stats.timing")
- def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmpdir):
- filename_to_parse = tmpdir / "temp_dag.py"
+ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmp_path):
+ path_to_parse = tmp_path / "temp_dag.py"
dag_code = dedent(
"""
from airflow import DAG
dag = DAG(dag_id='temp_dag', schedule='0 0 * * *')
"""
)
- with open(filename_to_parse, "w") as file_to_parse:
- file_to_parse.writelines(dag_code)
+ path_to_parse.write_text(dag_code)
child_pipe, parent_pipe = multiprocessing.Pipe()
@@ -910,7 +906,7 @@ class TestDagProcessorJobRunner:
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
- dag_directory=tmpdir,
+ dag_directory=path_to_parse.parent,
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
@@ -938,7 +934,7 @@ class TestDagProcessorJobRunner:
any_order=True,
)
- def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
+ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path):
"""Test DagProcessorJobRunner._refresh_dag_dir method"""
manager = DagProcessorJobRunner(
job=Job(),
@@ -952,7 +948,7 @@ class TestDagProcessorJobRunner:
async_mode=True,
),
)
- dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
+ dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
dagbag.process_file(zipped_dag_path)
dag = dagbag.get_dag("test_zip_dag")
@@ -967,7 +963,7 @@ class TestDagProcessorJobRunner:
# assert dag still active
assert dag.get_is_active()
- def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmpdir):
+ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path):
"""Test DagProcessorJobRunner._refresh_dag_dir method"""
manager = DagProcessorJobRunner(
job=Job(),
@@ -981,7 +977,7 @@ class TestDagProcessorJobRunner:
async_mode=True,
),
)
- dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
+ dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
dagbag.process_file(zipped_dag_path)
dag = dagbag.get_dag("test_zip_dag")
@@ -1000,10 +996,10 @@ class TestDagProcessorJobRunner:
# assert dag deactivated
assert not dag.get_is_active()
- def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, tmpdir):
+ def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, tmp_path):
"""Test DagProcessorJobRunner._refresh_dag_dir should not update dags outside its processor_subdir"""
- dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
+ dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
dag_path = os.path.join(TEST_DAGS_FOLDER, "test_miscellaneous.py")
dagbag.process_file(dag_path)
dag = dagbag.get_dag("miscellaneous_test_dag")
@@ -1040,7 +1036,7 @@ class TestDagProcessorJobRunner:
("scheduler", "standalone_dag_processor"): "True",
}
)
- def test_fetch_callbacks_from_database(self, tmpdir):
+ def test_fetch_callbacks_from_database(self, tmp_path):
"""Test DagProcessorJobRunner._fetch_callbacks method"""
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
@@ -1048,20 +1044,20 @@ class TestDagProcessorJobRunner:
dag_id="test_start_date_scheduling",
full_filepath=str(dag_filepath),
is_failure_callback=True,
- processor_subdir=str(tmpdir),
+ processor_subdir=os.fspath(tmp_path),
run_id="123",
)
callback2 = DagCallbackRequest(
dag_id="test_start_date_scheduling",
full_filepath=str(dag_filepath),
is_failure_callback=True,
- processor_subdir=str(tmpdir),
+ processor_subdir=os.fspath(tmp_path),
run_id="456",
)
callback3 = SlaCallbackRequest(
dag_id="test_start_date_scheduling",
full_filepath=str(dag_filepath),
- processor_subdir=str(tmpdir),
+ processor_subdir=os.fspath(tmp_path),
)
with create_session() as session:
@@ -1073,7 +1069,7 @@ class TestDagProcessorJobRunner:
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
- dag_directory=str(tmpdir),
+ dag_directory=os.fspath(tmp_path),
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
@@ -1093,7 +1089,7 @@ class TestDagProcessorJobRunner:
("scheduler", "standalone_dag_processor"): "True",
}
)
- def test_fetch_callbacks_for_current_dag_directory_only(self, tmpdir):
+ def test_fetch_callbacks_for_current_dag_directory_only(self, tmp_path):
"""Test DagProcessorJobRunner._fetch_callbacks method"""
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
@@ -1101,7 +1097,7 @@ class TestDagProcessorJobRunner:
dag_id="test_start_date_scheduling",
full_filepath=str(dag_filepath),
is_failure_callback=True,
- processor_subdir=str(tmpdir),
+ processor_subdir=os.fspath(tmp_path),
run_id="123",
)
callback2 = DagCallbackRequest(
@@ -1120,7 +1116,7 @@ class TestDagProcessorJobRunner:
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
- dag_directory=tmpdir,
+ dag_directory=tmp_path,
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
@@ -1141,7 +1137,7 @@ class TestDagProcessorJobRunner:
("core", "load_examples"): "False",
}
)
- def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir):
+ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path):
"""Test DagProcessorJobRunner._fetch_callbacks method"""
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
@@ -1152,7 +1148,7 @@ class TestDagProcessorJobRunner:
full_filepath=str(dag_filepath),
is_failure_callback=True,
run_id=str(i),
- processor_subdir=str(tmpdir),
+ processor_subdir=os.fspath(tmp_path),
)
session.add(DbCallbackRequest(callback=callback, priority_weight=i))
@@ -1160,7 +1156,7 @@ class TestDagProcessorJobRunner:
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
- dag_directory=str(tmpdir),
+ dag_directory=str(tmp_path),
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
@@ -1184,7 +1180,7 @@ class TestDagProcessorJobRunner:
("core", "load_examples"): "False",
}
)
- def test_fetch_callbacks_from_database_not_standalone(self, tmpdir):
+ def test_fetch_callbacks_from_database_not_standalone(self, tmp_path):
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
with create_session() as session:
@@ -1192,7 +1188,7 @@ class TestDagProcessorJobRunner:
dag_id="test_start_date_scheduling",
full_filepath=str(dag_filepath),
is_failure_callback=True,
- processor_subdir=str(tmpdir),
+ processor_subdir=str(tmp_path),
run_id="123",
)
session.add(DbCallbackRequest(callback=callback, priority_weight=10))
@@ -1201,7 +1197,7 @@ class TestDagProcessorJobRunner:
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
- dag_directory=tmpdir,
+ dag_directory=tmp_path,
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
@@ -1219,7 +1215,7 @@ class TestDagProcessorJobRunner:
with create_session() as session:
assert session.query(DbCallbackRequest).count() == 1
- def test_callback_queue(self, tmpdir):
+ def test_callback_queue(self, tmp_path):
# given
manager = DagProcessorJobRunner(
job=Job(),
@@ -1239,7 +1235,7 @@ class TestDagProcessorJobRunner:
dag_id="dag1",
run_id="run1",
is_failure_callback=False,
- processor_subdir=tmpdir,
+ processor_subdir=tmp_path,
msg=None,
)
dag1_req2 = DagCallbackRequest(
@@ -1247,18 +1243,18 @@ class TestDagProcessorJobRunner:
dag_id="dag1",
run_id="run1",
is_failure_callback=False,
- processor_subdir=tmpdir,
+ processor_subdir=tmp_path,
msg=None,
)
dag1_sla1 = SlaCallbackRequest(
full_filepath="/green_eggs/ham/file1.py",
dag_id="dag1",
- processor_subdir=tmpdir,
+ processor_subdir=tmp_path,
)
dag1_sla2 = SlaCallbackRequest(
full_filepath="/green_eggs/ham/file1.py",
dag_id="dag1",
- processor_subdir=tmpdir,
+ processor_subdir=tmp_path,
)
dag2_req1 = DagCallbackRequest(
@@ -1266,14 +1262,14 @@ class TestDagProcessorJobRunner:
dag_id="dag2",
run_id="run1",
is_failure_callback=False,
- processor_subdir=tmpdir,
+ processor_subdir=tmp_path,
msg=None,
)
dag3_sla1 = SlaCallbackRequest(
full_filepath="/green_eggs/ham/file3.py",
dag_id="dag3",
- processor_subdir=tmpdir,
+ processor_subdir=tmp_path,
)
# when