You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/22 06:18:46 UTC

[GitHub] [airflow] uranusjr commented on a diff in pull request #28198: Fix scheduler orm DetachedInstanceError when find zombies in standalone dag processor mode

uranusjr commented on code in PR #28198:
URL: https://github.com/apache/airflow/pull/28198#discussion_r1055126536


##########
airflow/jobs/scheduler_job.py:
##########
@@ -1520,49 +1520,54 @@ def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
         if num_timed_out_tasks:
             self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)
 
-    @provide_session
-    def _find_zombies(self, session: Session) -> None:
+    def _find_zombies(self) -> None:
         """
         Find zombie task instances, which are tasks haven't heartbeated for too long
         or have a no-longer-running LocalTaskJob, and create a TaskCallbackRequest
         to be handled by the DAG processor.
         """
-        from airflow.jobs.local_task_job import LocalTaskJob
-
         self.log.debug("Finding 'running' jobs without a recent heartbeat")
         limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
 
-        zombies = (
-            session.query(TaskInstance, DagModel.fileloc)
-            .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
-            .join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
-            .join(DagModel, TaskInstance.dag_id == DagModel.dag_id)
-            .filter(TaskInstance.state == TaskInstanceState.RUNNING)
-            .filter(
-                or_(
-                    LocalTaskJob.state != State.RUNNING,
-                    LocalTaskJob.latest_heartbeat < limit_dttm,
-                )
-            )
-            .filter(TaskInstance.queued_by_job_id == self.id)
-            .all()
-        )
+        zombies = self._find_zombies_from_db(limit_dttm=limit_dttm)
 
         if zombies:
             self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
 
-        for ti, file_loc in zombies:
+        for ti, file_loc, processor_subdir in zombies:
             zombie_message_details = self._generate_zombie_message_details(ti)
             request = TaskCallbackRequest(
                 full_filepath=file_loc,
-                processor_subdir=ti.dag_model.processor_subdir,
+                processor_subdir=processor_subdir,
                 simple_task_instance=SimpleTaskInstance.from_ti(ti),
                 msg=str(zombie_message_details),
             )
             self.log.error("Detected zombie job: %s", request)
             self.executor.send_callback(request)
             Stats.incr("zombies_killed")
 
+    @provide_session
+    def _find_zombies_from_db(self, limit_dttm, session: Session = NEW_SESSION) -> list[tuple[TI, str, str]]:

Review Comment:
   I don’t see why this needs `@provide_session`. It’d be better to do something like
   
   ```python
   with create_session() as session:
       zomebies = ...
   ```
   
   in `_find_zombies`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org