You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/16 17:16:59 UTC
[GitHub] [airflow] Taragolis commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications
Taragolis commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1355246110
Just my 50 cents based on past experience. In projects where I worked we usual use callable objects instead of just functions just for additional re-usability.
Something like this
```python
from abc import abstractmethod
from typing import Any
from airflow.models.taskinstance import Context
from airflow.utils.log.logging_mixin import LoggingMixin
class BaseCallback(LoggingMixin):
"""Base callback for Airflow tasks"""
@abstractmethod
def callback(self, context: Context) -> None:
pass
def __call__(self, context: Context):
try:
self.callback(context)
except Exception:
self.log.exception("Error during callback")
raise
```
So after that we can parametrise our pipelines and create callback like this
```python
from airflow.providers.slack.hooks.slack import SlackHook
class SlackNotificationCallback(BaseCallback):
def __init__(
self,
*,
slack_conn_id: str,
channel: str,
template: str,
template_type: str = "json",
username: str = None,
icon_emoji: str = None,
):
super().__init__()
self.slack_conn_id = slack_conn_id
self.template = template
self.template_type = template_type
self.channel = channel
self.username = username
self.icon_emoji = icon_emoji
def callback(self, context):
hook = SlackHook(slack_conn_id=self.slack_conn_id)
message = ... # Some magic with Jinja here
...
return hook.client.api_call("chat.postMessage", json=message)
```
And use it in task by different way
```python
task = EmptyOperator(
task_id = "awesome_task",
on_failure_callback=SlackNotificationCallback(
slack_conn_id="slack_api_conn",
channel="#everything-fine",
template="foo/bar/on-failure-slack.yml"
template_type="yaml",
),
on_retry_callback=SlackNotificationCallback(
slack_conn_id="slack_api_conn",
channel="#spam-nobody-read-it",
template="foo/bar/on-retry-slack.yml"
template_type="yaml",
)
)
```
The small limitation with this approach:
1. In case of task required more than one callback then need to create middleware Callback which run other callbacks in a loop
2. Unable out of the box use templated fields as result need to implements all jinja stuff manually
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org