You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "okayhooni (via GitHub)" <gi...@apache.org> on 2023/03/08 05:50:22 UTC
[GitHub] [airflow] okayhooni commented on pull request #28943: Add handling logic on CeleryExecutor to reschedule task stuck in queued status
okayhooni commented on PR #28943:
URL: https://github.com/apache/airflow/pull/28943#issuecomment-1459579711
@Tonkonozhenko
sorry for the late response. I implemented simple killer operator like below. (we currently use OpsGenie for the alert)
```python
from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional
import pytz
from airflow.models import BaseOperator
from airflow.models.taskinstance import TaskInstance
from airflow.providers.opsgenie.hooks.opsgenie import OpsgenieAlertHook
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State
from sqlalchemy.orm import Session
DEFAULT_ZOMBIE_THRESHOLD_TIMEDELTA = timedelta(minutes=30)
DEFAULT_ZOMBIE_MARKING_STATE = State.FAILED
DEFAULT_ZOMBIE_OPSGENIE_ALERT_PRIORITY = "P3"
class ZombieTaskKillerOperator(BaseOperator):
"""
Marking zombie tasks stuck on QUEUED state as specified state(default to FAILED)
related issues: https://github.com/apache/airflow/issues?q=is%3Aissue+is%3Aopen+stuck+queued
:param zombie_threshold_timedelta: target threshold timedelta for deciding whether this task is zombie or not
:param zombie_marking_state: marking state of detected queued zombie tasks, default to State.FAILED (= "failed")
:param only_on_current_dag: whether kill tasks only on the current DAG or not, default to True
:param opsgenie_conn_id: Airflow connection ID for alerting with OpsGenie. if not provided, there is no alert
:param opsgenie_priority: The priority of the OpsGenie alert. possible values are: P1, P2, P3, P4, P5, default to P3
"""
def __init__(
self,
*args,
zombie_threshold_timedelta: timedelta = DEFAULT_ZOMBIE_THRESHOLD_TIMEDELTA,
zombie_marking_state: State = DEFAULT_ZOMBIE_MARKING_STATE,
only_on_current_dag: bool = True,
opsgenie_conn_id: Optional[str] = None,
opsgenie_priority: str = DEFAULT_ZOMBIE_OPSGENIE_ALERT_PRIORITY,
**kwargs,
):
super().__init__(*args, **kwargs)
self.zombie_threshold_timedelta = zombie_threshold_timedelta
self.zombie_marking_state = zombie_marking_state
self.only_on_current_dag = only_on_current_dag
self.opsgenie_conn_id = opsgenie_conn_id
self.opsgenie_priority = opsgenie_priority
def execute(self, context: Dict[str, Any]) -> None:
"""execute method for ZombieTaskKillerOperator"""
queued_zombies = self._find_and_kill_queued_zombies()
if queued_zombies and self.opsgenie_conn_id:
self._alert_queued_zombies(zombie_tasks=queued_zombies, log_url=context["task_instance"].log_url)
@provide_session
def _find_and_kill_queued_zombies(
self, *, session: Session = NEW_SESSION
) -> Optional[List[TaskInstance]]:
"""
find and kill queued zombie with state update on Airflow meta database
:param session: Sqlalchemy ORM Session (provided on default)
:return: list of zombie task instances, returned from sqlalchemy ORM query, if exist
"""
cur_utc_datetime = datetime.now(pytz.utc)
self.log.info(f"[cur_utc_datetime]: {cur_utc_datetime}")
queued_zombie_target_filters = [
TaskInstance.state == State.QUEUED,
TaskInstance.queued_dttm <= cur_utc_datetime - self.zombie_threshold_timedelta,
] # queued_dttm field is loaded on metadb with UTC timezone
if self.only_on_current_dag:
queued_zombie_target_filters.append(TaskInstance.dag_id == self.dag_id)
queued_zombies_query = session.query(
TaskInstance.dag_id,
TaskInstance.run_id,
TaskInstance.task_id,
TaskInstance.operator,
TaskInstance.state,
TaskInstance.queued_dttm,
TaskInstance.queue,
TaskInstance.pool,
).filter(*queued_zombie_target_filters)
queued_zombies = queued_zombies_query.all()
if not queued_zombies:
self.log.info("CLEAR! CURRENTLY, THERE IS NO QUEUED ZOMBIE FOUND")
return
self.log.warning(f"[Queued Zombies]: {queued_zombies}")
queued_zombies_query.update(
{TaskInstance.state: self.zombie_marking_state}, synchronize_session="fetch"
)
self.log.warning(f"MARKING QUEUED ZOMBIES TO `{self.zombie_marking_state}` STATE")
return queued_zombies
def _alert_queued_zombies(self, *, zombie_tasks: List[TaskInstance], log_url: str) -> None:
"""
alert queued zombie tasks marked as a specified state on OpsGenie
:param zombie_tasks: list of zombie task instances, returned from sqlalchemy ORM query
:param log_url: url of this zombie killer task log
"""
zombie_tasks_description = [
dedent(
f"""\
[CAUGHT ZOMBIE - {idx}]
*DAG*: {zombie.dag_id}
*Run ID*: {zombie.run_id}
*Task*: {zombie.task_id}
*Queued Time*: {zombie.queued_dttm}\
"""
)
for idx, zombie in enumerate(zombie_tasks, 1)
]
alert_msg_json = {
"message": f"[Airflow] Marking {len(zombie_tasks)} queued zombie task(s) as `{self.zombie_marking_state}`",
"description": "\n=================================\n".join(zombie_tasks_description),
"details": {"Logs": log_url},
"source": "airflow",
"priority": self.opsgenie_priority,
"tags": ["airflow"],
}
opsgenie_hook = OpsgenieAlertHook(self.opsgenie_conn_id)
opsgenie_hook.create_alert(alert_msg_json)
```
Then, I deployed this custom operator on the DAG like below (15 min interval)
```python
zombie_killer = ZombieTaskKillerOperator(
task_id="zombie_killer",
zombie_marking_state=State.SCHEDULED,
zombie_threshold_timedelta=timedelta(minutes=29),
only_on_current_dag=False,
opsgenie_conn_id="ALARM_OPSGENIE_DATAPLATFORM",
opsgenie_priority="P5",
)
```
It's not fancy/graceful solution, but works well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org