You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "okayhooni (via GitHub)" <gi...@apache.org> on 2023/03/08 05:50:22 UTC

[GitHub] [airflow] okayhooni commented on pull request #28943: Add handling logic on CeleryExecutor to reschedule task stuck in queued status

okayhooni commented on PR #28943:
URL: https://github.com/apache/airflow/pull/28943#issuecomment-1459579711

   @Tonkonozhenko 
   sorry for the late response. I implemented simple killer operator like below. (we currently use OpsGenie for the alert)
   ```python
   from datetime import datetime, timedelta
   from textwrap import dedent
   from typing import Any, Dict, List, Optional
   
   import pytz
   from airflow.models import BaseOperator
   from airflow.models.taskinstance import TaskInstance
   from airflow.providers.opsgenie.hooks.opsgenie import OpsgenieAlertHook
   from airflow.utils.session import NEW_SESSION, provide_session
   from airflow.utils.state import State
   from sqlalchemy.orm import Session
   
   DEFAULT_ZOMBIE_THRESHOLD_TIMEDELTA = timedelta(minutes=30)
   DEFAULT_ZOMBIE_MARKING_STATE = State.FAILED
   DEFAULT_ZOMBIE_OPSGENIE_ALERT_PRIORITY = "P3"
   
   
   class ZombieTaskKillerOperator(BaseOperator):
       """
       Marking zombie tasks stuck on QUEUED state as specified state(default to FAILED)
       related issues: https://github.com/apache/airflow/issues?q=is%3Aissue+is%3Aopen+stuck+queued
   
       :param zombie_threshold_timedelta: target threshold timedelta for deciding whether this task is zombie or not
       :param zombie_marking_state: marking state of detected queued zombie tasks, default to State.FAILED (= "failed")
       :param only_on_current_dag: whether kill tasks only on the current DAG or not, default to True
       :param opsgenie_conn_id: Airflow connection ID for alerting with OpsGenie. if not provided, there is no alert
       :param opsgenie_priority: The priority of the OpsGenie alert. possible values are: P1, P2, P3, P4, P5, default to P3
       """
   
       def __init__(
           self,
           *args,
           zombie_threshold_timedelta: timedelta = DEFAULT_ZOMBIE_THRESHOLD_TIMEDELTA,
           zombie_marking_state: State = DEFAULT_ZOMBIE_MARKING_STATE,
           only_on_current_dag: bool = True,
           opsgenie_conn_id: Optional[str] = None,
           opsgenie_priority: str = DEFAULT_ZOMBIE_OPSGENIE_ALERT_PRIORITY,
           **kwargs,
       ):
           super().__init__(*args, **kwargs)
           self.zombie_threshold_timedelta = zombie_threshold_timedelta
           self.zombie_marking_state = zombie_marking_state
           self.only_on_current_dag = only_on_current_dag
           self.opsgenie_conn_id = opsgenie_conn_id
           self.opsgenie_priority = opsgenie_priority
   
       def execute(self, context: Dict[str, Any]) -> None:
           """execute method for ZombieTaskKillerOperator"""
           queued_zombies = self._find_and_kill_queued_zombies()
   
           if queued_zombies and self.opsgenie_conn_id:
               self._alert_queued_zombies(zombie_tasks=queued_zombies, log_url=context["task_instance"].log_url)
   
       @provide_session
       def _find_and_kill_queued_zombies(
           self, *, session: Session = NEW_SESSION
       ) -> Optional[List[TaskInstance]]:
           """
           find and kill queued zombie with state update on Airflow meta database
   
           :param session: Sqlalchemy ORM Session (provided on default)
           :return: list of zombie task instances, returned from sqlalchemy ORM query, if exist
           """
           cur_utc_datetime = datetime.now(pytz.utc)
           self.log.info(f"[cur_utc_datetime]: {cur_utc_datetime}")
   
           queued_zombie_target_filters = [
               TaskInstance.state == State.QUEUED,
               TaskInstance.queued_dttm <= cur_utc_datetime - self.zombie_threshold_timedelta,
           ]  # queued_dttm field is loaded on metadb with UTC timezone
   
           if self.only_on_current_dag:
               queued_zombie_target_filters.append(TaskInstance.dag_id == self.dag_id)
   
           queued_zombies_query = session.query(
               TaskInstance.dag_id,
               TaskInstance.run_id,
               TaskInstance.task_id,
               TaskInstance.operator,
               TaskInstance.state,
               TaskInstance.queued_dttm,
               TaskInstance.queue,
               TaskInstance.pool,
           ).filter(*queued_zombie_target_filters)
   
           queued_zombies = queued_zombies_query.all()
           if not queued_zombies:
               self.log.info("CLEAR! CURRENTLY, THERE IS NO QUEUED ZOMBIE FOUND")
               return
   
           self.log.warning(f"[Queued Zombies]: {queued_zombies}")
           queued_zombies_query.update(
               {TaskInstance.state: self.zombie_marking_state}, synchronize_session="fetch"
           )
           self.log.warning(f"MARKING QUEUED ZOMBIES TO `{self.zombie_marking_state}` STATE")
   
           return queued_zombies
   
       def _alert_queued_zombies(self, *, zombie_tasks: List[TaskInstance], log_url: str) -> None:
           """
           alert queued zombie tasks marked as a specified state on OpsGenie
   
           :param zombie_tasks: list of zombie task instances, returned from sqlalchemy ORM query
           :param log_url: url of this zombie killer task log
           """
           zombie_tasks_description = [
               dedent(
                   f"""\
               [CAUGHT ZOMBIE - {idx}]
               *DAG*: {zombie.dag_id}
               *Run ID*: {zombie.run_id}
               *Task*: {zombie.task_id}
               *Queued Time*: {zombie.queued_dttm}\
               """
               )
               for idx, zombie in enumerate(zombie_tasks, 1)
           ]
   
           alert_msg_json = {
               "message": f"[Airflow] Marking {len(zombie_tasks)} queued zombie task(s) as `{self.zombie_marking_state}`",
               "description": "\n=================================\n".join(zombie_tasks_description),
               "details": {"Logs": log_url},
               "source": "airflow",
               "priority": self.opsgenie_priority,
               "tags": ["airflow"],
           }
   
           opsgenie_hook = OpsgenieAlertHook(self.opsgenie_conn_id)
           opsgenie_hook.create_alert(alert_msg_json)
   ```
   
   Then, I deployed this custom operator on the DAG like below (15 min interval)
   ```python
       zombie_killer = ZombieTaskKillerOperator(
           task_id="zombie_killer",
           zombie_marking_state=State.SCHEDULED,
           zombie_threshold_timedelta=timedelta(minutes=29),
           only_on_current_dag=False,
           opsgenie_conn_id="ALARM_OPSGENIE_DATAPLATFORM",
           opsgenie_priority="P5",
       )
   ```
   
   It's not fancy/graceful solution, but works well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org