You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/16 17:30:49 UTC

[GitHub] [airflow] jedcunningham commented on a change in pull request #21556: WIP: Fix stuck queued task in Celery

jedcunningham commented on a change in pull request #21556:
URL: https://github.com/apache/airflow/pull/21556#discussion_r808275160



##########
File path: airflow/executors/celery_executor.py
##########
@@ -342,7 +342,12 @@ def sync(self) -> None:
         if self.adopted_task_timeouts:
             self._check_for_stalled_adopted_tasks()
         if time.time() - self.stuck_tasks_last_check_time > self.stuck_queued_task_check_interval:
-            self._clear_stuck_queued_tasks()
+            try:
+                with timeout(seconds=OPERATION_TIMEOUT):

Review comment:
       I'm a little torn on this. Should we just guard the celery query? I'm also not sure we should be silent about it.
   
   The default of 1s short for this whole operation too imo.
   
   I'm not sure we should have this at all, being db backed?

##########
File path: airflow/executors/celery_executor.py
##########
@@ -398,28 +403,34 @@ def _clear_stuck_queued_tasks(self, session: Session = NEW_SESSION) -> None:
             # We only want to do this for database backends where
             # this case has been spotted
             return
-        # We use this instead of using bulk_state_fetcher because we
-        # may not have the stuck task in self.tasks and we don't want
-        # to clear task in self.tasks too
-        session_ = app.backend.ResultSession()
-        task_cls = getattr(app.backend, "task_cls", TaskDb)
-        with session_cleanup(session_):
-            celery_task_ids = [
-                t.task_id
-                for t in session_.query(task_cls.task_id)
-                .filter(~task_cls.status.in_([celery_states.SUCCESS, celery_states.FAILURE]))
-                .all()
-            ]
         self.log.debug("Checking for stuck queued tasks")
 
         max_allowed_time = utcnow() - self.task_adoption_timeout
 
-        for task in session.query(TaskInstance).filter(
-            TaskInstance.state == State.QUEUED, TaskInstance.queued_dttm < max_allowed_time
-        ):
-            if task.key in self.queued_tasks or task.key in self.running:
-                continue

Review comment:
       Why don't we want this short-circuit any longer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org