You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/11 22:38:25 UTC

[airflow] branch v2-5-test updated: Fix `DetachedInstanceError` when finding zombies in Dag Parsing process (#28198)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-5-test by this push:
     new d59a479a11 Fix `DetachedInstanceError` when finding zombies in Dag Parsing process (#28198)
d59a479a11 is described below

commit d59a479a1154f9d803177af43fe4875d6d6c7459
Author: Bob Du <i...@bobdu.cc>
AuthorDate: Thu Dec 22 20:36:39 2022 +0800

    Fix `DetachedInstanceError` when finding zombies in Dag Parsing process (#28198)
    
    ```
    [2022-12-06T14:20:21.622+0000] {base_job.py:229} DEBUG - [heartbeat]
    [2022-12-06T14:20:21.623+0000] {scheduler_job.py:1495} DEBUG - Finding 'running' jobs without a recent heartbeat
    [2022-12-06T14:20:21.637+0000] {scheduler_job.py:1515} WARNING - Failing (2) jobs without heartbeat after 2022-12-06 14:15:21.623199+00:00
    [2022-12-06T14:20:21.641+0000] {scheduler_job.py:1526} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/xxx_dag.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'xxx', 'Task Id': 'xxx', 'Run Id': 'scheduled__2022-12-05T00:15:00+00:00', 'Hostname': 'airflow-worker-0.airflow-worker.airflow2.svc.cluster.local', 'External Executor Id': '9520cb9f-3245-497a-8e17-e9dec29d4549'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object a [...]
    [2022-12-06T14:20:21.645+0000] {scheduler_job.py:763} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
    Traceback (most recent call last):
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
        self._run_scheduler_loop()
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop
        next_event = timers.run(blocking=False)
      File "/usr/local/lib/python3.10/sched.py", line 151, in run
        action(*argument, **kwargs)
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat
        action(*args, **kwargs)
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
        return func(*args, session=session, **kwargs)
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies
        processor_subdir=ti.dag_model.processor_subdir,
      File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
        return self.impl.get(state, dict_)
      File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
        value = self._fire_loader_callables(state, key, passive)
      File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
        return self.callable_(state, passive)
      File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
        raise orm_exc.DetachedInstanceError(
    sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
    [2022-12-06T14:20:21.647+0000] {celery_executor.py:443} DEBUG - Inquiring about 5 celery task(s)
    [2022-12-06T14:20:21.669+0000] {celery_executor.py:602} DEBUG - Fetched 5 state(s) for 5 task(s)
    [2022-12-06T14:20:21.669+0000] {celery_executor.py:446} DEBUG - Inquiries completed.
    [2022-12-06T14:20:21.669+0000] {scheduler_job.py:775} INFO - Exited execute loop
    [2022-12-06T14:20:21.674+0000] {cli_action_loggers.py:83} DEBUG - Calling callbacks: []
    Traceback (most recent call last):
      File "/home/airflow/.local/bin/airflow", line 8, in <module>
        sys.exit(main())
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 39, in main
        args.func(args)
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 52, in command
        return func(*args, **kwargs)
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 103, in wrapper
        return f(*args, **kwargs)
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 85, in scheduler
        _run_scheduler_job(args=args)
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 50, in _run_scheduler_job
        job.run()
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 247, in run
        self._execute()
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
        self._run_scheduler_loop()
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop
        next_event = timers.run(blocking=False)
      File "/usr/local/lib/python3.10/sched.py", line 151, in run
        action(*argument, **kwargs)
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat
        action(*args, **kwargs)
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
        return func(*args, session=session, **kwargs)
      File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies
        processor_subdir=ti.dag_model.processor_subdir,
      File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
        return self.impl.get(state, dict_)
      File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
        value = self._fire_loader_callables(state, key, passive)
      File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
        return self.callable_(state, passive)
      File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
        raise orm_exc.DetachedInstanceError(
    sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
    ```
    
    When in standalone dag processor mode, will use `DatabaseCallbackSink`
    
    `_find_zombies` func call `self.executor.send_callback(request)` func.
    But not propagation orm `session` , provide_session in `send` func again.
    
    ```
    class DatabaseCallbackSink(BaseCallbackSink):
        """Sends callbacks to database."""
    
        @provide_session
        def send(self, callback: CallbackRequest, session: Session = NEW_SESSION) -> None:
            """Sends callback for execution."""
            db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
            session.add(db_callback)
    ```
    
    Signed-off-by: BobDu <i...@bobdu.cc>
    (cherry picked from commit 4b340b7561e9db0055bf69ad0fc8b3a508ea7667)
---
 airflow/jobs/scheduler_job.py    | 33 ++++++++++++++++----------------
 tests/jobs/test_scheduler_job.py | 41 ++++++++++++++++++++--------------------
 2 files changed, 36 insertions(+), 38 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 6fe8d3d710..baeabdc2ec 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1510,8 +1510,7 @@ class SchedulerJob(BaseJob):
         if num_timed_out_tasks:
             self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)
 
-    @provide_session
-    def _find_zombies(self, session: Session) -> None:
+    def _find_zombies(self) -> None:
         """
         Find zombie task instances, which are tasks haven't heartbeated for too long
         or have a no-longer-running LocalTaskJob, and create a TaskCallbackRequest
@@ -1522,30 +1521,30 @@ class SchedulerJob(BaseJob):
         self.log.debug("Finding 'running' jobs without a recent heartbeat")
         limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
 
-        zombies = (
-            session.query(TaskInstance, DagModel.fileloc)
-            .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
-            .join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
-            .join(DagModel, TaskInstance.dag_id == DagModel.dag_id)
-            .filter(TaskInstance.state == TaskInstanceState.RUNNING)
-            .filter(
-                or_(
-                    LocalTaskJob.state != State.RUNNING,
-                    LocalTaskJob.latest_heartbeat < limit_dttm,
+        with create_session() as session:
+            zombies: list[tuple[TI, str, str]] = (
+                session.query(TI, DM.fileloc, DM.processor_subdir)
+                .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
+                .join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
+                .filter(TI.state == TaskInstanceState.RUNNING)
+                .filter(
+                    or_(
+                        LocalTaskJob.state != State.RUNNING,
+                        LocalTaskJob.latest_heartbeat < limit_dttm,
+                    )
                 )
+                .filter(TI.queued_by_job_id == self.id)
+                .all()
             )
-            .filter(TaskInstance.queued_by_job_id == self.id)
-            .all()
-        )
 
         if zombies:
             self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
 
-        for ti, file_loc in zombies:
+        for ti, file_loc, processor_subdir in zombies:
             zombie_message_details = self._generate_zombie_message_details(ti)
             request = TaskCallbackRequest(
                 full_filepath=file_loc,
-                processor_subdir=ti.dag_model.processor_subdir,
+                processor_subdir=processor_subdir,
                 simple_task_instance=SimpleTaskInstance.from_ti(ti),
                 msg=str(zombie_message_details),
             )
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b0c8e016c8..f04a76f57e 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -4129,14 +4129,13 @@ class TestSchedulerJob:
         assert ti2.state == State.DEFERRED
 
     def test_find_zombies_nothing(self):
-        with create_session() as session:
-            executor = MockExecutor(do_update=False)
-            self.scheduler_job = SchedulerJob(executor=executor)
-            self.scheduler_job.processor_agent = mock.MagicMock()
+        executor = MockExecutor(do_update=False)
+        self.scheduler_job = SchedulerJob(executor=executor)
+        self.scheduler_job.processor_agent = mock.MagicMock()
 
-            self.scheduler_job._find_zombies(session=session)
+        self.scheduler_job._find_zombies()
 
-            self.scheduler_job.executor.callback_sink.send.assert_not_called()
+        self.scheduler_job.executor.callback_sink.send.assert_not_called()
 
     def test_find_zombies(self, load_examples):
         dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
@@ -4179,20 +4178,21 @@ class TestSchedulerJob:
             ti.queued_by_job_id = self.scheduler_job.id
             session.flush()
 
-            self.scheduler_job._find_zombies(session=session)
+        self.scheduler_job._find_zombies()
 
-            self.scheduler_job.executor.callback_sink.send.assert_called_once()
-            requests = self.scheduler_job.executor.callback_sink.send.call_args[0]
-            assert 1 == len(requests)
-            assert requests[0].full_filepath == dag.fileloc
-            assert requests[0].msg == str(self.scheduler_job._generate_zombie_message_details(ti))
-            assert requests[0].is_failure_callback is True
-            assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
-            assert ti.dag_id == requests[0].simple_task_instance.dag_id
-            assert ti.task_id == requests[0].simple_task_instance.task_id
-            assert ti.run_id == requests[0].simple_task_instance.run_id
-            assert ti.map_index == requests[0].simple_task_instance.map_index
+        self.scheduler_job.executor.callback_sink.send.assert_called_once()
+        requests = self.scheduler_job.executor.callback_sink.send.call_args[0]
+        assert 1 == len(requests)
+        assert requests[0].full_filepath == dag.fileloc
+        assert requests[0].msg == str(self.scheduler_job._generate_zombie_message_details(ti))
+        assert requests[0].is_failure_callback is True
+        assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
+        assert ti.dag_id == requests[0].simple_task_instance.dag_id
+        assert ti.task_id == requests[0].simple_task_instance.task_id
+        assert ti.run_id == requests[0].simple_task_instance.run_id
+        assert ti.map_index == requests[0].simple_task_instance.map_index
 
+        with create_session() as session:
             session.query(TaskInstance).delete()
             session.query(LocalTaskJob).delete()
 
@@ -4267,12 +4267,11 @@ class TestSchedulerJob:
         Check that the same set of failure callback with zombies are passed to the dag
         file processors until the next zombie detection logic is invoked.
         """
-        with conf_vars({("core", "load_examples"): "False"}):
+        with conf_vars({("core", "load_examples"): "False"}), create_session() as session:
             dagbag = DagBag(
                 dag_folder=os.path.join(settings.DAGS_FOLDER, "test_example_bash_operator.py"),
                 read_dags_from_db=False,
             )
-            session = settings.Session()
             session.query(LocalTaskJob).delete()
             dag = dagbag.get_dag("test_example_bash_operator")
             dag.sync_to_db(processor_subdir=TEST_DAG_FOLDER)
@@ -4301,7 +4300,7 @@ class TestSchedulerJob:
         self.scheduler_job.executor = MockExecutor()
         self.scheduler_job.processor_agent = mock.MagicMock()
 
-        self.scheduler_job._find_zombies(session=session)
+        self.scheduler_job._find_zombies()
 
         self.scheduler_job.executor.callback_sink.send.assert_called_once()