You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/22 07:35:14 UTC
[GitHub] [airflow] BobDu commented on a diff in pull request #28198: Fix scheduler orm DetachedInstanceError when find zombies in standalone dag processor mode
BobDu commented on code in PR #28198:
URL: https://github.com/apache/airflow/pull/28198#discussion_r1055167804
##########
airflow/jobs/scheduler_job.py:
##########
@@ -1520,49 +1520,54 @@ def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)
- @provide_session
- def _find_zombies(self, session: Session) -> None:
+ def _find_zombies(self) -> None:
"""
Find zombie task instances, which are tasks haven't heartbeated for too long
or have a no-longer-running LocalTaskJob, and create a TaskCallbackRequest
to be handled by the DAG processor.
"""
- from airflow.jobs.local_task_job import LocalTaskJob
-
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
- zombies = (
- session.query(TaskInstance, DagModel.fileloc)
- .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
- .join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
- .join(DagModel, TaskInstance.dag_id == DagModel.dag_id)
- .filter(TaskInstance.state == TaskInstanceState.RUNNING)
- .filter(
- or_(
- LocalTaskJob.state != State.RUNNING,
- LocalTaskJob.latest_heartbeat < limit_dttm,
- )
- )
- .filter(TaskInstance.queued_by_job_id == self.id)
- .all()
- )
+ zombies = self._find_zombies_from_db(limit_dttm=limit_dttm)
if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
- for ti, file_loc in zombies:
+ for ti, file_loc, processor_subdir in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
- processor_subdir=ti.dag_model.processor_subdir,
+ processor_subdir=processor_subdir,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=str(zombie_message_details),
)
self.log.error("Detected zombie job: %s", request)
self.executor.send_callback(request)
Stats.incr("zombies_killed")
+ @provide_session
+ def _find_zombies_from_db(self, limit_dttm, session: Session = NEW_SESSION) -> list[tuple[TI, str, str]]:
Review Comment:
@uranusjr Modified according to your suggestion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org