You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/10/29 22:40:35 UTC

(airflow) 43/44: Fix subtle bug in mocking processor_agent in our tests (#35221)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 69538406f3f1aef0389d5240df5aea11625bba61
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Fri Oct 27 21:31:14 2023 +0200

    Fix subtle bug in mocking processor_agent in our tests (#35221)
    
    Some of the scheduler tests tried to prevent DAG processor processing DAGs
    from "tests/dags" directory by setting processor_agent to Mock object:
    
    ```python
       self.job_runner.processor_agent = mock.MagicMock()
    ```
    
    This, in connection with scheduler job cleaning all the tables and
    approach similar to:
    
    ```python
            dag = self.dagbag.get_dag("test_retry_handling_job")
            dag_task1 = dag.get_task("test_retry_handling_op")
            dag.clear()
            dag.sync_to_db()
    ```
    
    Allowed the test to run in isolated space where only one or few
    DAGS were present in the DB.
    
    This probably worked perfectly in the past, but after some changes
    in how DAGFileProcessor works this did not prevent DAGFileProcessor
    from running when _execute method in scheduler_job_runner has been
    executed, and standalone dag processor was not running, the
    processor_agent has been overwritten by a new DagFileProcessor
    in the `_execute` method of scheduler_job_runner.
    
    ```python
            if not self._standalone_dag_processor:
                self.processor_agent = DagFileProcessorAgent(
                    dag_directory=Path(self.subdir),
                    max_runs=self.num_times_parse_dags,
                    processor_timeout=processor_timeout,
                    dag_ids=[],
                    pickle_dags=pickle_dags,
                    async_mode=async_mode,
                )
    ```
    
    This led to a very subtle race condition which was more likely on
    machines with multiple cores/faster disk (so for example it
    led to #35204 which appeared on self-hosted (8 core) runners and
    did not appear on Public (2-core runners) or it could appear on
    an 8 core ARM Mac but not appear on 6 core Intel Mac (only on
    sqlite)
    
    If the DAGFileProcessor managed to start and spawn some
    parsing processes and grab the DB write access for sqlite and those
    processes managed to parse some of the DAG files from tests/dags/
    folder, those DAGs could have polutted the DAGs in the DB - leading
    to undesired effects (for example with test hanging while the
    scheduler job run attempted to process an unwanted subdag and
    got deadlocked in case of #35204.
    
    The solution to that is to only set the processor_agent if not
    set already. This can only happen in unit tests when the
    `processor_agent` sets it to Mock object. For "production" the
    agent is only set once in the `_execute` methods so there is no
    risk involved in checking if it is not set already.
    
    Fixes: #35204
    (cherry picked from commit 6f3d294645153db914be69cd2b2a49f12a18280c)
---
 airflow/jobs/scheduler_job_runner.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py
index 3b65dbfafa..3d06d509a4 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -809,7 +809,7 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin):
 
         processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
         processor_timeout = timedelta(seconds=processor_timeout_seconds)
-        if not self._standalone_dag_processor:
+        if not self._standalone_dag_processor and not self.processor_agent:
             self.processor_agent = DagFileProcessorAgent(
                 dag_directory=Path(self.subdir),
                 max_runs=self.num_times_parse_dags,