You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/12 09:33:58 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #19769: Handle stuck queued tasks in Celery

kaxil commented on a change in pull request #19769:
URL: https://github.com/apache/airflow/pull/19769#discussion_r782876872



##########
File path: airflow/executors/celery_executor.py
##########
@@ -377,6 +384,52 @@ def _check_for_stalled_adopted_tasks(self):
             for key in timedout_keys:
                 self.change_state(key, State.FAILED)
 
+    @provide_session
+    def _clear_stuck_queued_tasks(self, session=None):
+        """
+        Tasks can get stuck in queued state in DB while still not in
+        worker. This happens when the worker is autoscaled down and
+        the task is queued but has not been picked up by any worker prior to the scaling.
+
+        In such situation, we update the task instance state to scheduled so that
+        it can be queued again. We chose to use task_adoption_timeout to decide
+        """
+        if not isinstance(app.backend, DatabaseBackend):
+            # We only want to do this for database backends where
+            # this case has been spotted
+            return
+        # We use this instead of using bulk_state_fetcher because we
+        # may not have the stuck task in self.tasks and we don't want
+        # to clear task in self.tasks too
+        session_ = app.backend.ResultSession()
+        task_cls = getattr(app.backend, "task_cls", TaskDb)
+        with session_cleanup(session_):
+            celery_task_ids = [t.task_id for t in session_.query(task_cls.task_id).all()]
+        self.log.debug("Checking for stuck queued tasks")
+
+        max_allowed_time = utcnow() - self.task_adoption_timeout
+
+        for task in session.query(TaskInstance).filter(
+            TaskInstance.state == State.QUEUED, TaskInstance.queued_dttm < max_allowed_time
+        ):
+
+            self.log.info("Checking task %s", task)

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org