You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/16 11:08:39 UTC
[GitHub] [airflow] ashb opened a new issue, #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications
ashb opened a new issue, #28405:
URL: https://github.com/apache/airflow/issues/28405
In response to someone asking this on the mailing list I proposed this API: https://lists.apache.org/thread.html/rcd7fbd99408f5cdc1cc09e8a1619b629743da158ecf4c9d0eeb411f1%40%3Cdev.airflow.apache.org%3E
```python
from airflow.providers.slack.notifiers import send_slack_message
task = MyOperator(
task_id = "something",
on_failure_callback= send_slack_message(
slack_conn_id='slack-default', # Default, not actually required
here, for example only
channels=['#data-ops'],
mentions=['@ash'],
),
)
```
Names can be changed/discussed, this is just a rough idea.
Using this approach means that a) We don't need to add a whole bunch of
new config, and b) it can be easily extended/created by providers
without needing changes to core -- after all the feature to run code
after failure/success already exists, we just want to "package up" the
common task of sending a slack message.
I think for consistency of interface too we should deprecate the
email_on_failure task attribute too in favour of a similar function.
And finally, perhaps `on_*_callback` gets extended to allow a list of
functions instead of just one.
_Originally posted by @ashb in https://github.com/apache/airflow/issues/12611#issuecomment-849625657_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1354593506
Love the idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy closed issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications
Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications
URL: https://github.com/apache/airflow/issues/28405
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1359649621
@dinigo I believe this is exactly what has been discussed in #12611 .
Those two concepts (of "per-task notification" and "airflow system-wide notification") are easilly confused and it happened already in #12611. So let's not mix those two - if you think you want "general notification" you should discuss it there.
BUT
The #12611 has been closed (by me) because I believe Airflow 2.5.0 "listener" API https://airflow.apache.org/docs/apache-airflow/stable/listeners.html allows you to do exactly what you want. You can write your own notifiers, as comples or as difficult you want and using whatever mechanism you wantt for notification - by plugging in your own listener. You can even contribute such listeners back to Airflow if you think they might be useful for others (and for example add them to providers - similarly like elasticsearch, cloudwatch etc. logging handlers have been added there). While listener API has not been foreseen for this initially, I believe it perfectly fits the needs you have. I suggest you migrate to Airflow 2.5.0 and try it even now to see if it works. If you will find otherwise - feel free to comment in #12611 about your findings and why you think it's not good enough.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] dinigo commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications
Posted by GitBox <gi...@apache.org>.
dinigo commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1359555364
In general, I think the option for Dependency Injection (like we do with logging, email, lineage...) is very good, and I'll go with it rather than polluting the "user-space" (DAG)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] Taragolis commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications
Posted by GitBox <gi...@apache.org>.
Taragolis commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1355246110
Just my 50 cents based on past experience. In projects where I worked we usual use callable objects instead of just functions just for additional re-usability.
Something like this
```python
from abc import abstractmethod
from typing import Any
from airflow.models.taskinstance import Context
from airflow.utils.log.logging_mixin import LoggingMixin
class BaseCallback(LoggingMixin):
"""Base callback for Airflow tasks"""
@abstractmethod
def callback(self, context: Context) -> None:
pass
def __call__(self, context: Context):
try:
self.callback(context)
except Exception:
self.log.exception("Error during callback")
raise
```
So after that we can parametrise our pipelines and create callback like this
```python
from airflow.providers.slack.hooks.slack import SlackHook
class SlackNotificationCallback(BaseCallback):
def __init__(
self,
*,
slack_conn_id: str,
channel: str,
template: str,
template_type: str = "json",
username: str = None,
icon_emoji: str = None,
):
super().__init__()
self.slack_conn_id = slack_conn_id
self.template = template
self.template_type = template_type
self.channel = channel
self.username = username
self.icon_emoji = icon_emoji
def callback(self, context):
hook = SlackHook(slack_conn_id=self.slack_conn_id)
message = ... # Some magic with Jinja here
...
return hook.client.api_call("chat.postMessage", json=message)
```
And use it in task by different way
```python
task = EmptyOperator(
task_id = "awesome_task",
on_failure_callback=SlackNotificationCallback(
slack_conn_id="slack_api_conn",
channel="#everything-fine",
template="foo/bar/on-failure-slack.yml"
template_type="yaml",
),
on_retry_callback=SlackNotificationCallback(
slack_conn_id="slack_api_conn",
channel="#spam-nobody-read-it",
template="foo/bar/on-retry-slack.yml"
template_type="yaml",
)
)
```
The small limitation with this approach:
1. In case of task required more than one callback then need to create middleware Callback which run other callbacks in a loop
2. Unable out of the box use templated fields as result need to implements all jinja stuff manually
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] dinigo commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications
Posted by GitBox <gi...@apache.org>.
dinigo commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1359549462
When I opened the issue there was no such thing as "Google Cloud Composer", It has now integrated logs and alarms. There are many workaround still.
The issue remains cause I see people implementing the "alarming/monitoring/SRE" as part of the "Data Engineering/Orchestration". Those are different concerns and I think there should be some "alarming by default"
I suggestion was to have a common notification interface, similar to the email one (`airflow/utils/notification.py`):
```python
# inspired by https://github.com/apache/airflow/blob/main/airflow/utils/email.py
# see https://airflow.apache.org/docs/apache-airflow/stable/howto/email-config.html
class NotificationInterface(ABC):
@abstractmethod
def notify(
self,
conn_id: str = conf.get("notification", "CONN_ID", fallback=None),
message: str = conf.get("notification", "MESSAGE_TEMPLATE", fallback=None),
to: List[str] = conf.get("notification", "TO", fallback=None),
from_user: str = conf.get("notification", "FROM_USER", fallback=None),
):
"""
1. apply jinja to message
2. retrieve the connection
3. use the connection hook/method
"""
Then implement this interface, for example for Slack (`airflow/providers/slack/notification.py`)
```python
class SlackNotifier(NotificationInterface):
def notify(
self,
conn_id: str = conf.get("notification", "CONN_ID", fallback=SlackHook.default_conn_name),
message: str = conf.get("notification", "MESSAGE_TEMPLATE", fallback=None),
to: List[str] = conf.get("notification", "TO", fallback=None),
from_user: str = conf.get("notification", "FROM_USER", fallback='airflow')
):
if message.endswith('.txt'):
message = open(message).read()
environment = jinja2.Environment()
template = environment.from_string(message)
rendered_message = template.render(from_user=from_user, to=to)
slack_client = SlackHook(conn_id).client
for channel in to:
slack_client.chat_postMessage(
channel=channel,
text=rendered_message,
username=from_user,
)
```
So that you can easily configure it from the `airflow.cfg` (or env variables):
```toml
[notification]
class="airflow.providers.slack.notification"
conn_id="slack_api_conn"
to=['#airflow-notifications']
message = "foo/bar/slack-message.txt"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org