You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ta...@apache.org on 2024/04/13 11:13:07 UTC
(airflow) branch main updated: Resolve internal warnings for TestLocalTaskJob and TestSigTermOnRunner (#38893)
This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 28bd256829 Resolve internal warnings for TestLocalTaskJob and TestSigTermOnRunner (#38893)
28bd256829 is described below
commit 28bd25682943aa8b082dd48b401a627a5de35d94
Author: Owen Leung <ow...@gmail.com>
AuthorDate: Sat Apr 13 19:12:59 2024 +0800
Resolve internal warnings for TestLocalTaskJob and TestSigTermOnRunner (#38893)
* Resolve internal warnings for TestLocalTaskJob and TestSigTermOnRunner
* Use tmp_path fixture
---
tests/deprecations_ignore.yml | 13 -----
tests/jobs/test_local_task_job.py | 103 +++++++++++++++++++++++---------------
2 files changed, 64 insertions(+), 52 deletions(-)
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index c8f02a193d..3c1c8c0efc 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -92,19 +92,6 @@
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_subdag_clear_parentdag_downstream_clear
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_update_counters
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfilling_dags
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_dagrun_timeout_logged_in_task_logs
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_failure_callback_called_by_airflow_run_raw_process
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_fast_follow
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_heartbeat_failed_fast
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_local_task_return_code_metric
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_localtaskjob_double_trigger
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_localtaskjob_maintain_heart_rate
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_mark_failure_on_failure_callback
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_mark_success_no_kill
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_mark_success_on_success_callback
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_mini_scheduler_works_with_wait_for_upstream
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_process_os_signal_calls_on_failure_callback
-- tests/jobs/test_local_task_job.py::TestSigtermOnRunner::test_process_sigterm_works_with_retries
- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_adopt_or_reset_orphaned_tasks
- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run
- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_deadlock_ignore_depends_on_past
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index d2188e83d1..db0e53db36 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -26,7 +26,6 @@ import signal
import threading
import time
import uuid
-import warnings
from unittest import mock
from unittest.mock import patch
@@ -60,6 +59,7 @@ from tests.test_utils.mock_executor import MockExecutor
pytestmark = pytest.mark.db_test
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+DEFAULT_LOGICAL_DATE = timezone.coerce_datetime(DEFAULT_DATE)
TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"]
@@ -293,13 +293,14 @@ class TestLocalTaskJob:
task_id = "test_heartbeat_failed_fast_op"
dag = self.dagbag.get_dag(dag_id)
task = dag.get_task(task_id)
-
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dr = dag.create_dagrun(
run_id="test_heartbeat_failed_fast_run",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session,
+ data_interval=data_interval,
)
ti = dr.task_instances[0]
@@ -327,11 +328,13 @@ class TestLocalTaskJob:
the task to fail, and that the task exits
"""
dag = get_test_dag("test_mark_state")
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dr = dag.create_dagrun(
state=State.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
+ data_interval=data_interval,
)
task = dag.get_task(task_id="test_mark_success_no_kill")
@@ -352,6 +355,7 @@ class TestLocalTaskJob:
def test_localtaskjob_double_trigger(self):
dag = self.dagbag.dags.get("test_localtaskjob_double_trigger")
task = dag.get_task("test_localtaskjob_double_trigger_task")
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
session = settings.Session()
@@ -362,6 +366,7 @@ class TestLocalTaskJob:
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session,
+ data_interval=data_interval,
)
ti = dr.get_task_instance(task_id=task.task_id, session=session)
@@ -388,9 +393,10 @@ class TestLocalTaskJob:
@patch.object(StandardTaskRunner, "return_code")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr", autospec=True)
def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag):
- _, task = create_dummy_dag("test_localtaskjob_code")
+ dag, task = create_dummy_dag("test_localtaskjob_code")
+ dag_run = dag.get_last_dagrun()
- ti_run = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+ ti_run = TaskInstance(task=task, run_id=dag_run.run_id)
ti_run.refresh_from_db()
job1 = Job(dag_id=ti_run.dag_id, executor=SequentialExecutor())
job_runner = LocalTaskJobRunner(job=job1, task_instance=ti_run)
@@ -418,9 +424,10 @@ class TestLocalTaskJob:
@patch.object(StandardTaskRunner, "return_code")
def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create_dummy_dag):
- _, task = create_dummy_dag("test_localtaskjob_double_trigger")
+ dag, task = create_dummy_dag("test_localtaskjob_double_trigger")
+ dag_run = dag.get_last_dagrun()
- ti_run = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+ ti_run = TaskInstance(task=task, run_id=dag_run.run_id)
ti_run.refresh_from_db()
job1 = Job(dag_id=ti_run.dag_id, executor=SequentialExecutor())
job_runner = LocalTaskJobRunner(job=job1, task_instance=ti_run)
@@ -453,12 +460,14 @@ class TestLocalTaskJob:
the task, and executes on_failure_callback
"""
dag = get_test_dag("test_mark_state")
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
with create_session() as session:
dr = dag.create_dagrun(
state=State.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
+ data_interval=data_interval,
)
task = dag.get_task(task_id="test_mark_failure_externally")
ti = dr.get_task_instance(task.task_id)
@@ -484,6 +493,7 @@ class TestLocalTaskJob:
"""
dag = get_test_dag("test_mark_state")
dag.dagrun_timeout = datetime.timedelta(microseconds=1)
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
with create_session() as session:
dr = dag.create_dagrun(
state=State.RUNNING,
@@ -491,6 +501,7 @@ class TestLocalTaskJob:
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
+ data_interval=data_interval,
)
task = dag.get_task(task_id="test_mark_skipped_externally")
ti = dr.get_task_instance(task.task_id)
@@ -515,15 +526,17 @@ class TestLocalTaskJob:
callback_file.touch()
monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file))
dag = get_test_dag("test_on_failure_callback")
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
with create_session() as session:
- dag.create_dagrun(
+ dr = dag.create_dagrun(
state=State.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
+ data_interval=data_interval,
)
task = dag.get_task(task_id="test_on_failure_callback_task")
- ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+ ti = TaskInstance(task=task, run_id=dr.run_id)
ti.refresh_from_db()
job1 = Job(executor=SequentialExecutor(), dag_id=ti.dag_id)
@@ -546,12 +559,14 @@ class TestLocalTaskJob:
on_success_callback gets executed
"""
dag = get_test_dag("test_mark_state")
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
with create_session() as session:
dr = dag.create_dagrun(
state=State.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
+ data_interval=data_interval,
)
task = dag.get_task(task_id="test_mark_success_no_kill")
@@ -583,15 +598,18 @@ class TestLocalTaskJob:
# callback_file will be created by the task: bash_sleep
monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file))
dag = get_test_dag("test_on_failure_callback")
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
with create_session() as session:
dag.create_dagrun(
state=State.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
+ data_interval=data_interval,
)
task = dag.get_task(task_id="bash_sleep")
- ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+ dag_run = dag.get_last_dagrun()
+ ti = TaskInstance(task=task, run_id=dag_run.run_id)
ti.refresh_from_db()
signal_sent_status = {"sent": False}
@@ -724,7 +742,7 @@ class TestLocalTaskJob:
session.merge(ti)
ti_by_task_id[task_id] = ti
- ti = TaskInstance(task=dag.get_task(task_ids_to_run[0]), execution_date=dag_run.execution_date)
+ ti = TaskInstance(task=dag.get_task(task_ids_to_run[0]), run_id=dag_run.run_id)
ti.refresh_from_db()
job1 = Job(executor=SequentialExecutor(), dag_id=ti.dag_id)
job_runner = LocalTaskJobRunner(job=job1, task_instance=ti, ignore_ti_state=True)
@@ -733,9 +751,7 @@ class TestLocalTaskJob:
run_job(job=job1, execute_callable=job_runner._execute)
self.validate_ti_states(dag_run, first_run_state, error_message)
if second_run_state:
- ti = TaskInstance(
- task=dag.get_task(task_ids_to_run[1]), execution_date=dag_run.execution_date
- )
+ ti = TaskInstance(task=dag.get_task(task_ids_to_run[1]), run_id=dag_run.run_id)
ti.refresh_from_db()
job2 = Job(dag_id=ti.dag_id, executor=SequentialExecutor())
job_runner = LocalTaskJobRunner(job=job2, task_instance=ti, ignore_ti_state=True)
@@ -748,12 +764,18 @@ class TestLocalTaskJob:
@conf_vars({("scheduler", "schedule_after_task_execution"): "True"})
def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, get_test_dag):
dag = get_test_dag("test_dagrun_fast_follow")
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dag.catchup = False
SerializedDagModel.write_dag(dag)
- dr = dag.create_dagrun(run_id="test_1", state=State.RUNNING, execution_date=DEFAULT_DATE)
+ dr = dag.create_dagrun(
+ run_id="test_1", state=State.RUNNING, execution_date=DEFAULT_DATE, data_interval=data_interval
+ )
dr2 = dag.create_dagrun(
- run_id="test_2", state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+ run_id="test_2",
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
+ data_interval=data_interval,
)
task_k = dag.get_task("K")
task_l = dag.get_task("L")
@@ -870,9 +892,7 @@ class TestSigtermOnRunner:
pytest.param("spawn", 30, id="spawn"),
],
)
- def test_process_sigterm_works_with_retries(
- self, mp_method, wait_timeout, daemon, clear_db, request, capfd
- ):
+ def test_process_sigterm_works_with_retries(self, mp_method, wait_timeout, daemon, clear_db, tmp_path):
"""Test that ensures that task runner sets tasks to retry when task runner receive SIGTERM."""
mp_context = mp.get_context(mp_method)
@@ -885,11 +905,19 @@ class TestSigtermOnRunner:
task_id = "test_on_retry_callback"
execution_date = DEFAULT_DATE
run_id = f"test-{execution_date.date().isoformat()}"
-
+ tmp_file = tmp_path / "test.txt"
# Run LocalTaskJob in separate process
proc = mp_context.Process(
target=self._sigterm_local_task_runner,
- args=(dag_id, task_id, run_id, execution_date, task_started, retry_callback_called),
+ args=(
+ tmp_file,
+ dag_id,
+ task_id,
+ run_id,
+ execution_date,
+ task_started,
+ retry_callback_called,
+ ),
name="LocalTaskJob-TestProcess",
daemon=daemon,
)
@@ -913,26 +941,17 @@ class TestSigtermOnRunner:
# and fact that process with LocalTaskJob could be already killed.
# We could add state validation (`UP_FOR_RETRY`) if callback mechanism changed.
- pytest_capture = request.config.option.capture
- if pytest_capture == "no":
- # Since we run `LocalTaskJob` in the separate process we can grab ut easily by `caplog`.
- # However, we could grab it from stdout/stderr but only if `-s` flag set, see:
- # https://github.com/pytest-dev/pytest/issues/5997
- captured = capfd.readouterr()
- for msg in [
- "Received SIGTERM. Terminating subprocesses",
- "Task exited with return code 143",
- ]:
- assert msg in captured.out or msg in captured.err
- else:
- warnings.warn(
- f"Skip test logs in stdout/stderr when capture enabled: {pytest_capture}, "
- f"please pass `-s` option.",
- UserWarning,
- )
+ captured = tmp_file.read_text()
+ for msg in [
+ "Received SIGTERM. Terminating subprocesses",
+ "Task exited with return code 143",
+ ]:
+ # assert msg in captured.out or msg in captured.err
+ assert msg in captured
@staticmethod
def _sigterm_local_task_runner(
+ tmpfile_path,
dag_id,
task_id,
run_id,
@@ -963,9 +982,15 @@ class TestSigtermOnRunner:
retries=1,
on_retry_callback=retry_callback,
)
+ logger = logging.getLogger()
+ tmpfile_handler = logging.FileHandler(tmpfile_path)
+ logger.addHandler(tmpfile_handler)
- dag.create_dagrun(state=State.RUNNING, run_id=run_id, execution_date=execution_date)
- ti = TaskInstance(task=task, execution_date=execution_date)
+ data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
+ dag_run = dag.create_dagrun(
+ state=State.RUNNING, run_id=run_id, execution_date=execution_date, data_interval=data_interval
+ )
+ ti = TaskInstance(task=task, run_id=dag_run.run_id)
ti.refresh_from_db()
job = Job(executor=SequentialExecutor(), dag_id=ti.dag_id)
job_runner = LocalTaskJobRunner(job=job, task_instance=ti, ignore_ti_state=True)