You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/16 17:16:59 UTC

[GitHub] [airflow] Taragolis commented on issue #28405: Add general-purpose "notifier" concept to DAGs for easier slack/teams/email notifications

Taragolis commented on issue #28405:
URL: https://github.com/apache/airflow/issues/28405#issuecomment-1355246110

   Just my 50 cents based on past experience. In projects where I worked we usual use callable objects instead of just functions just for additional re-usability.
   
   Something like this
   
   ```python
   from abc import abstractmethod
   from typing import Any
   
   from airflow.models.taskinstance import Context
   from airflow.utils.log.logging_mixin import LoggingMixin
   
   
   class BaseCallback(LoggingMixin):
       """Base callback for Airflow tasks"""
   
       @abstractmethod
       def callback(self, context: Context) -> None:
           pass
   
       def __call__(self, context: Context):
           try:
               self.callback(context)
           except Exception:
               self.log.exception("Error during callback")
               raise
   ```
   
   So after that we can parametrise our pipelines and create callback like this
   
   ```python
   from airflow.providers.slack.hooks.slack import SlackHook
   
   
   class SlackNotificationCallback(BaseCallback):
       def __init__(
           self,
           *,
           slack_conn_id: str,
           channel: str,
           template: str,
           template_type: str = "json",
           username: str = None,
           icon_emoji: str = None,
       ):
           super().__init__()
           self.slack_conn_id = slack_conn_id
           self.template = template
           self.template_type = template_type
           self.channel = channel
           self.username = username
           self.icon_emoji = icon_emoji
   
       def callback(self, context):
           hook = SlackHook(slack_conn_id=self.slack_conn_id)
           message = ... # Some magic with Jinja here
           ...
   
           return hook.client.api_call("chat.postMessage", json=message)
   ```
   
   And use it in task by different way
   
   ```python
   task = EmptyOperator(
       task_id = "awesome_task",
       on_failure_callback=SlackNotificationCallback(
           slack_conn_id="slack_api_conn",
           channel="#everything-fine",
           template="foo/bar/on-failure-slack.yml"
           template_type="yaml",
       ),
       on_retry_callback=SlackNotificationCallback(
           slack_conn_id="slack_api_conn",
           channel="#spam-nobody-read-it",
           template="foo/bar/on-retry-slack.yml"
           template_type="yaml",
       )
   )
   ```
   
   The small limitation with this approach:
   1. In case of task required more than one callback then need to create middleware Callback which run other callbacks in a loop
   2. Unable out of the box use templated fields as result need to implements all jinja stuff manually


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org