You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/16 11:08:39 UTC

[GitHub] [airflow] ashb opened a new issue, #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications

ashb opened a new issue, #28405:
URL: https://github.com/apache/airflow/issues/28405

           In response to someone asking this on the mailing list I proposed this API: https://lists.apache.org/thread.html/rcd7fbd99408f5cdc1cc09e8a1619b629743da158ecf4c9d0eeb411f1%40%3Cdev.airflow.apache.org%3E
   
   ```python
   from airflow.providers.slack.notifiers import send_slack_message
   
   task = MyOperator(
       task_id = "something",
       on_failure_callback= send_slack_message(
           slack_conn_id='slack-default', # Default, not actually required 
   here, for example only
           channels=['#data-ops'],
           mentions=['@ash'],
       ),
   )
   ```
   
   Names can be changed/discussed, this is just a rough idea.
   
   Using this approach means that a) We don't need to add a whole bunch of 
   new config, and b) it can be easily extended/created by providers 
   without needing changes to core -- after all the feature to run code 
   after failure/success already exists, we just want to "package up" the 
   common task of sending a slack message.
   
   I think for consistency of interface too we should deprecate the 
   email_on_failure task attribute too in favour of a similar function.
   
   And finally, perhaps `on_*_callback` gets extended to allow a list of 
   functions instead of just one.
   
   _Originally posted by @ashb in https://github.com/apache/airflow/issues/12611#issuecomment-849625657_
         


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1354593506

   Love the idea. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy closed issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications
URL: https://github.com/apache/airflow/issues/28405


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1359649621

   @dinigo I believe this is exactly what has been discussed in #12611 . 
   
   Those two concepts (of "per-task notification" and "airflow system-wide notification") are easilly confused and it happened already in #12611. So let's not mix those two - if you think you want "general notification" you should discuss it there. 
   
   BUT
   
   The #12611 has been closed (by me) because I believe Airflow 2.5.0 "listener" API https://airflow.apache.org/docs/apache-airflow/stable/listeners.html  allows you to do exactly what you want. You can write your own notifiers, as comples or as difficult you want and using whatever mechanism you wantt for notification - by plugging in your own listener. You can even contribute such listeners back to Airflow if you think they might be useful for others (and for example add them to providers - similarly like elasticsearch, cloudwatch etc. logging handlers have been added there).  While listener API has not been foreseen for this initially, I believe it perfectly fits the needs you have. I suggest you migrate to Airflow 2.5.0 and try it even now to see if it works.  If you will find otherwise - feel free to comment in #12611 about your findings and why you think it's not good enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dinigo commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications

Posted by GitBox <gi...@apache.org>.
dinigo commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1359555364

   In general, I think the option for Dependency Injection (like we do with logging, email, lineage...) is very good, and I'll go with it rather than polluting the "user-space" (DAG)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications

Posted by GitBox <gi...@apache.org>.
Taragolis commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1355246110

   Just my 50 cents based on past experience. In projects where I worked we usual use callable objects instead of just functions just for additional re-usability.
   
   Something like this
   
   ```python
   from abc import abstractmethod
   from typing import Any
   
   from airflow.models.taskinstance import Context
   from airflow.utils.log.logging_mixin import LoggingMixin
   
   
   class BaseCallback(LoggingMixin):
       """Base callback for Airflow tasks"""
   
       @abstractmethod
       def callback(self, context: Context) -> None:
           pass
   
       def __call__(self, context: Context):
           try:
               self.callback(context)
           except Exception:
               self.log.exception("Error during callback")
               raise
   ```
   
   So after that we can parametrise our pipelines and create callback like this
   
   ```python
   from airflow.providers.slack.hooks.slack import SlackHook
   
   
   class SlackNotificationCallback(BaseCallback):
       def __init__(
           self,
           *,
           slack_conn_id: str,
           channel: str,
           template: str,
           template_type: str = "json",
           username: str = None,
           icon_emoji: str = None,
       ):
           super().__init__()
           self.slack_conn_id = slack_conn_id
           self.template = template
           self.template_type = template_type
           self.channel = channel
           self.username = username
           self.icon_emoji = icon_emoji
   
       def callback(self, context):
           hook = SlackHook(slack_conn_id=self.slack_conn_id)
           message = ... # Some magic with Jinja here
           ...
   
           return hook.client.api_call("chat.postMessage", json=message)
   ```
   
   And use it in task by different way
   
   ```python
   task = EmptyOperator(
       task_id = "awesome_task",
       on_failure_callback=SlackNotificationCallback(
           slack_conn_id="slack_api_conn",
           channel="#everything-fine",
           template="foo/bar/on-failure-slack.yml"
           template_type="yaml",
       ),
       on_retry_callback=SlackNotificationCallback(
           slack_conn_id="slack_api_conn",
           channel="#spam-nobody-read-it",
           template="foo/bar/on-retry-slack.yml"
           template_type="yaml",
       )
   )
   ```
   
   The small limitation with this approach:
   1. In case of task required more than one callback then need to create middleware Callback which run other callbacks in a loop
   2. Unable out of the box use templated fields as result need to implements all jinja stuff manually


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dinigo commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications

Posted by GitBox <gi...@apache.org>.
dinigo commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1359549462

   When I opened the issue there was no such thing as "Google Cloud Composer", It has now integrated logs and alarms. There are many workaround still.
   
   The issue remains cause I see people implementing the "alarming/monitoring/SRE" as part of the "Data Engineering/Orchestration". Those are different concerns and I think there should be some "alarming by default"
   
   I suggestion was to have a common notification interface, similar to the email one (`airflow/utils/notification.py`):
   ```python
   # inspired by https://github.com/apache/airflow/blob/main/airflow/utils/email.py
   # see https://airflow.apache.org/docs/apache-airflow/stable/howto/email-config.html
   class NotificationInterface(ABC):
       @abstractmethod
       def notify(
               self,
               conn_id: str = conf.get("notification", "CONN_ID", fallback=None),
               message: str = conf.get("notification", "MESSAGE_TEMPLATE", fallback=None),
               to: List[str] = conf.get("notification", "TO", fallback=None),
               from_user: str = conf.get("notification", "FROM_USER", fallback=None),
       ):
           """
           1. apply jinja to message
           2. retrieve the connection
           3. use the connection hook/method
           """
   
   Then implement this interface, for example for Slack (`airflow/providers/slack/notification.py`)
   ```python
   class SlackNotifier(NotificationInterface):
   
       def notify(
               self,
               conn_id: str = conf.get("notification", "CONN_ID", fallback=SlackHook.default_conn_name),
               message: str = conf.get("notification", "MESSAGE_TEMPLATE", fallback=None),
               to: List[str] = conf.get("notification", "TO", fallback=None),
               from_user: str = conf.get("notification", "FROM_USER", fallback='airflow')
       ):
           if message.endswith('.txt'):
               message = open(message).read()
           environment = jinja2.Environment()
           template = environment.from_string(message)
           rendered_message = template.render(from_user=from_user, to=to)
           slack_client = SlackHook(conn_id).client
           for channel in to:
               slack_client.chat_postMessage(
                   channel=channel,
                   text=rendered_message,
                   username=from_user,
               )
   ```
   
   So that you can easily configure it from the `airflow.cfg` (or env variables):
   ```toml
   [notification]
   class="airflow.providers.slack.notification"
   conn_id="slack_api_conn"
   to=['#airflow-notifications']
   message = "foo/bar/slack-message.txt"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org