You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ta...@apache.org on 2024/04/13 11:13:07 UTC

(airflow) branch main updated: Resolve internal warnings for TestLocalTaskJob and TestSigTermOnRunner (#38893)

This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 28bd256829 Resolve internal warnings for TestLocalTaskJob and TestSigTermOnRunner (#38893)
28bd256829 is described below

commit 28bd25682943aa8b082dd48b401a627a5de35d94
Author: Owen Leung <ow...@gmail.com>
AuthorDate: Sat Apr 13 19:12:59 2024 +0800

    Resolve internal warnings for TestLocalTaskJob and TestSigTermOnRunner (#38893)
    
    * Resolve internal warnings for TestLocalTaskJob and TestSigTermOnRunner
    
    * Use tmp_path fixture
---
 tests/deprecations_ignore.yml     |  13 -----
 tests/jobs/test_local_task_job.py | 103 +++++++++++++++++++++++---------------
 2 files changed, 64 insertions(+), 52 deletions(-)

diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index c8f02a193d..3c1c8c0efc 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -92,19 +92,6 @@
 - tests/jobs/test_backfill_job.py::TestBackfillJob::test_subdag_clear_parentdag_downstream_clear
 - tests/jobs/test_backfill_job.py::TestBackfillJob::test_update_counters
 - tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfilling_dags
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_dagrun_timeout_logged_in_task_logs
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_failure_callback_called_by_airflow_run_raw_process
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_fast_follow
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_heartbeat_failed_fast
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_local_task_return_code_metric
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_localtaskjob_double_trigger
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_localtaskjob_maintain_heart_rate
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_mark_failure_on_failure_callback
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_mark_success_no_kill
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_mark_success_on_success_callback
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_mini_scheduler_works_with_wait_for_upstream
-- tests/jobs/test_local_task_job.py::TestLocalTaskJob::test_process_os_signal_calls_on_failure_callback
-- tests/jobs/test_local_task_job.py::TestSigtermOnRunner::test_process_sigterm_works_with_retries
 - tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_adopt_or_reset_orphaned_tasks
 - tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run
 - tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_deadlock_ignore_depends_on_past
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index d2188e83d1..db0e53db36 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -26,7 +26,6 @@ import signal
 import threading
 import time
 import uuid
-import warnings
 from unittest import mock
 from unittest.mock import patch
 
@@ -60,6 +59,7 @@ from tests.test_utils.mock_executor import MockExecutor
 pytestmark = pytest.mark.db_test
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+DEFAULT_LOGICAL_DATE = timezone.coerce_datetime(DEFAULT_DATE)
 TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"]
 
 
@@ -293,13 +293,14 @@ class TestLocalTaskJob:
             task_id = "test_heartbeat_failed_fast_op"
             dag = self.dagbag.get_dag(dag_id)
             task = dag.get_task(task_id)
-
+            data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
             dr = dag.create_dagrun(
                 run_id="test_heartbeat_failed_fast_run",
                 state=State.RUNNING,
                 execution_date=DEFAULT_DATE,
                 start_date=DEFAULT_DATE,
                 session=session,
+                data_interval=data_interval,
             )
 
             ti = dr.task_instances[0]
@@ -327,11 +328,13 @@ class TestLocalTaskJob:
         the task to fail, and that the task exits
         """
         dag = get_test_dag("test_mark_state")
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         dr = dag.create_dagrun(
             state=State.RUNNING,
             execution_date=DEFAULT_DATE,
             run_type=DagRunType.SCHEDULED,
             session=session,
+            data_interval=data_interval,
         )
         task = dag.get_task(task_id="test_mark_success_no_kill")
 
@@ -352,6 +355,7 @@ class TestLocalTaskJob:
     def test_localtaskjob_double_trigger(self):
         dag = self.dagbag.dags.get("test_localtaskjob_double_trigger")
         task = dag.get_task("test_localtaskjob_double_trigger_task")
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
 
         session = settings.Session()
 
@@ -362,6 +366,7 @@ class TestLocalTaskJob:
             execution_date=DEFAULT_DATE,
             start_date=DEFAULT_DATE,
             session=session,
+            data_interval=data_interval,
         )
 
         ti = dr.get_task_instance(task_id=task.task_id, session=session)
@@ -388,9 +393,10 @@ class TestLocalTaskJob:
     @patch.object(StandardTaskRunner, "return_code")
     @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr", autospec=True)
     def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag):
-        _, task = create_dummy_dag("test_localtaskjob_code")
+        dag, task = create_dummy_dag("test_localtaskjob_code")
+        dag_run = dag.get_last_dagrun()
 
-        ti_run = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti_run = TaskInstance(task=task, run_id=dag_run.run_id)
         ti_run.refresh_from_db()
         job1 = Job(dag_id=ti_run.dag_id, executor=SequentialExecutor())
         job_runner = LocalTaskJobRunner(job=job1, task_instance=ti_run)
@@ -418,9 +424,10 @@ class TestLocalTaskJob:
 
     @patch.object(StandardTaskRunner, "return_code")
     def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create_dummy_dag):
-        _, task = create_dummy_dag("test_localtaskjob_double_trigger")
+        dag, task = create_dummy_dag("test_localtaskjob_double_trigger")
+        dag_run = dag.get_last_dagrun()
 
-        ti_run = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti_run = TaskInstance(task=task, run_id=dag_run.run_id)
         ti_run.refresh_from_db()
         job1 = Job(dag_id=ti_run.dag_id, executor=SequentialExecutor())
         job_runner = LocalTaskJobRunner(job=job1, task_instance=ti_run)
@@ -453,12 +460,14 @@ class TestLocalTaskJob:
         the task, and executes on_failure_callback
         """
         dag = get_test_dag("test_mark_state")
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         with create_session() as session:
             dr = dag.create_dagrun(
                 state=State.RUNNING,
                 execution_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
                 session=session,
+                data_interval=data_interval,
             )
         task = dag.get_task(task_id="test_mark_failure_externally")
         ti = dr.get_task_instance(task.task_id)
@@ -484,6 +493,7 @@ class TestLocalTaskJob:
         """
         dag = get_test_dag("test_mark_state")
         dag.dagrun_timeout = datetime.timedelta(microseconds=1)
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         with create_session() as session:
             dr = dag.create_dagrun(
                 state=State.RUNNING,
@@ -491,6 +501,7 @@ class TestLocalTaskJob:
                 execution_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
                 session=session,
+                data_interval=data_interval,
             )
         task = dag.get_task(task_id="test_mark_skipped_externally")
         ti = dr.get_task_instance(task.task_id)
@@ -515,15 +526,17 @@ class TestLocalTaskJob:
         callback_file.touch()
         monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file))
         dag = get_test_dag("test_on_failure_callback")
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         with create_session() as session:
-            dag.create_dagrun(
+            dr = dag.create_dagrun(
                 state=State.RUNNING,
                 execution_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
                 session=session,
+                data_interval=data_interval,
             )
         task = dag.get_task(task_id="test_on_failure_callback_task")
-        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti = TaskInstance(task=task, run_id=dr.run_id)
         ti.refresh_from_db()
 
         job1 = Job(executor=SequentialExecutor(), dag_id=ti.dag_id)
@@ -546,12 +559,14 @@ class TestLocalTaskJob:
         on_success_callback gets executed
         """
         dag = get_test_dag("test_mark_state")
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         with create_session() as session:
             dr = dag.create_dagrun(
                 state=State.RUNNING,
                 execution_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
                 session=session,
+                data_interval=data_interval,
             )
         task = dag.get_task(task_id="test_mark_success_no_kill")
 
@@ -583,15 +598,18 @@ class TestLocalTaskJob:
         # callback_file will be created by the task: bash_sleep
         monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file))
         dag = get_test_dag("test_on_failure_callback")
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         with create_session() as session:
             dag.create_dagrun(
                 state=State.RUNNING,
                 execution_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
                 session=session,
+                data_interval=data_interval,
             )
         task = dag.get_task(task_id="bash_sleep")
-        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        dag_run = dag.get_last_dagrun()
+        ti = TaskInstance(task=task, run_id=dag_run.run_id)
         ti.refresh_from_db()
 
         signal_sent_status = {"sent": False}
@@ -724,7 +742,7 @@ class TestLocalTaskJob:
                     session.merge(ti)
                     ti_by_task_id[task_id] = ti
 
-            ti = TaskInstance(task=dag.get_task(task_ids_to_run[0]), execution_date=dag_run.execution_date)
+            ti = TaskInstance(task=dag.get_task(task_ids_to_run[0]), run_id=dag_run.run_id)
             ti.refresh_from_db()
             job1 = Job(executor=SequentialExecutor(), dag_id=ti.dag_id)
             job_runner = LocalTaskJobRunner(job=job1, task_instance=ti, ignore_ti_state=True)
@@ -733,9 +751,7 @@ class TestLocalTaskJob:
             run_job(job=job1, execute_callable=job_runner._execute)
             self.validate_ti_states(dag_run, first_run_state, error_message)
             if second_run_state:
-                ti = TaskInstance(
-                    task=dag.get_task(task_ids_to_run[1]), execution_date=dag_run.execution_date
-                )
+                ti = TaskInstance(task=dag.get_task(task_ids_to_run[1]), run_id=dag_run.run_id)
                 ti.refresh_from_db()
                 job2 = Job(dag_id=ti.dag_id, executor=SequentialExecutor())
                 job_runner = LocalTaskJobRunner(job=job2, task_instance=ti, ignore_ti_state=True)
@@ -748,12 +764,18 @@ class TestLocalTaskJob:
     @conf_vars({("scheduler", "schedule_after_task_execution"): "True"})
     def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, get_test_dag):
         dag = get_test_dag("test_dagrun_fast_follow")
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         dag.catchup = False
         SerializedDagModel.write_dag(dag)
 
-        dr = dag.create_dagrun(run_id="test_1", state=State.RUNNING, execution_date=DEFAULT_DATE)
+        dr = dag.create_dagrun(
+            run_id="test_1", state=State.RUNNING, execution_date=DEFAULT_DATE, data_interval=data_interval
+        )
         dr2 = dag.create_dagrun(
-            run_id="test_2", state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1)
+            run_id="test_2",
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
+            data_interval=data_interval,
         )
         task_k = dag.get_task("K")
         task_l = dag.get_task("L")
@@ -870,9 +892,7 @@ class TestSigtermOnRunner:
             pytest.param("spawn", 30, id="spawn"),
         ],
     )
-    def test_process_sigterm_works_with_retries(
-        self, mp_method, wait_timeout, daemon, clear_db, request, capfd
-    ):
+    def test_process_sigterm_works_with_retries(self, mp_method, wait_timeout, daemon, clear_db, tmp_path):
         """Test that ensures that task runner sets tasks to retry when task runner receive SIGTERM."""
         mp_context = mp.get_context(mp_method)
 
@@ -885,11 +905,19 @@ class TestSigtermOnRunner:
         task_id = "test_on_retry_callback"
         execution_date = DEFAULT_DATE
         run_id = f"test-{execution_date.date().isoformat()}"
-
+        tmp_file = tmp_path / "test.txt"
         # Run LocalTaskJob in separate process
         proc = mp_context.Process(
             target=self._sigterm_local_task_runner,
-            args=(dag_id, task_id, run_id, execution_date, task_started, retry_callback_called),
+            args=(
+                tmp_file,
+                dag_id,
+                task_id,
+                run_id,
+                execution_date,
+                task_started,
+                retry_callback_called,
+            ),
             name="LocalTaskJob-TestProcess",
             daemon=daemon,
         )
@@ -913,26 +941,17 @@ class TestSigtermOnRunner:
         # and fact that process with LocalTaskJob could be already killed.
         # We could add state validation (`UP_FOR_RETRY`) if callback mechanism changed.
 
-        pytest_capture = request.config.option.capture
-        if pytest_capture == "no":
-            # Since we run `LocalTaskJob` in the separate process we can grab ut easily by `caplog`.
-            # However, we could grab it from stdout/stderr but only if `-s` flag set, see:
-            # https://github.com/pytest-dev/pytest/issues/5997
-            captured = capfd.readouterr()
-            for msg in [
-                "Received SIGTERM. Terminating subprocesses",
-                "Task exited with return code 143",
-            ]:
-                assert msg in captured.out or msg in captured.err
-        else:
-            warnings.warn(
-                f"Skip test logs in stdout/stderr when capture enabled: {pytest_capture}, "
-                f"please pass `-s` option.",
-                UserWarning,
-            )
+        captured = tmp_file.read_text()
+        for msg in [
+            "Received SIGTERM. Terminating subprocesses",
+            "Task exited with return code 143",
+        ]:
+            # assert msg in captured.out or msg in captured.err
+            assert msg in captured
 
     @staticmethod
     def _sigterm_local_task_runner(
+        tmpfile_path,
         dag_id,
         task_id,
         run_id,
@@ -963,9 +982,15 @@ class TestSigtermOnRunner:
                 retries=1,
                 on_retry_callback=retry_callback,
             )
+        logger = logging.getLogger()
+        tmpfile_handler = logging.FileHandler(tmpfile_path)
+        logger.addHandler(tmpfile_handler)
 
-        dag.create_dagrun(state=State.RUNNING, run_id=run_id, execution_date=execution_date)
-        ti = TaskInstance(task=task, execution_date=execution_date)
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
+        dag_run = dag.create_dagrun(
+            state=State.RUNNING, run_id=run_id, execution_date=execution_date, data_interval=data_interval
+        )
+        ti = TaskInstance(task=task, run_id=dag_run.run_id)
         ti.refresh_from_db()
         job = Job(executor=SequentialExecutor(), dag_id=ti.dag_id)
         job_runner = LocalTaskJobRunner(job=job, task_instance=ti, ignore_ti_state=True)