You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/22 19:48:13 UTC
[airflow] branch main updated: Refactor SlackWebhookHook in order to use `slack_sdk` instead of HttpHook methods (#26452)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 95a5fc7ec9 Refactor SlackWebhookHook in order to use `slack_sdk` instead of HttpHook methods (#26452)
95a5fc7ec9 is described below
commit 95a5fc7ec9a637337af9446f11d7f90a6e47e006
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Thu Sep 22 23:48:02 2022 +0400
Refactor SlackWebhookHook in order to use `slack_sdk` instead of HttpHook methods (#26452)
* Refactor SlackWebhookHook in order to use `slack_sdk` instead of HttpHook methods
---
airflow/providers/slack/CHANGELOG.rst | 10 +
airflow/providers/slack/hooks/slack.py | 6 +-
airflow/providers/slack/hooks/slack_webhook.py | 524 +++++++++++++----
airflow/providers/slack/operators/slack_webhook.py | 37 +-
airflow/providers/slack/transfers/sql_to_slack.py | 7 +-
.../connections/slack-incoming-webhook.rst | 95 +++
.../connections/slack.rst | 2 +-
docs/spelling_wordlist.txt | 1 +
tests/providers/slack/hooks/test_slack_webhook.py | 651 +++++++++++++++++----
.../providers/slack/transfers/test_sql_to_slack.py | 35 +-
.../snowflake/transfers/test_snowflake_to_slack.py | 9 +-
11 files changed, 1101 insertions(+), 276 deletions(-)
diff --git a/airflow/providers/slack/CHANGELOG.rst b/airflow/providers/slack/CHANGELOG.rst
index 6bfb86c6fd..5153473809 100644
--- a/airflow/providers/slack/CHANGELOG.rst
+++ b/airflow/providers/slack/CHANGELOG.rst
@@ -24,6 +24,16 @@
Changelog
---------
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+* The hook class :class:`airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook` not inherit from
+ :class:`airflow.providers.http.hooks.http.HttpHook` anymore. In practice the only impact on
+ user-defined classes based on **SlackWebhookHook** and you use attributes from **HttpHook**.
+* Drop support deprecated ``webhook_token`` parameter in
+ :ref:`Slack Incoming Webhook Connection <howto/connection:slack-incoming-webhook>` Extra.
+
+
5.1.0
.....
diff --git a/airflow/providers/slack/hooks/slack.py b/airflow/providers/slack/hooks/slack.py
index 5d280a6c5b..60d6ce78a9 100644
--- a/airflow/providers/slack/hooks/slack.py
+++ b/airflow/providers/slack/hooks/slack.py
@@ -49,7 +49,7 @@ class SlackHook(BaseHook):
.. warning::
This hook intend to use `Slack API` connection
- and might not work correctly with `Slack Webhook` and `HTTP` connections.
+ and might not work correctly with `Slack Incoming Webhook` and `HTTP` connections.
Takes both Slack API token directly and connection that has Slack API token. If both are
supplied, Slack API token will be used. Also exposes the rest of slack.WebClient args.
@@ -74,8 +74,8 @@ class SlackHook(BaseHook):
and receive a response from Slack. If not set than default WebClient value will use.
:param base_url: A string representing the Slack API base URL.
If not set than default WebClient BASE_URL will use (``https://www.slack.com/api/``).
- :param proxy: Proxy to make the Slack Incoming Webhook call.
- :param retry_handlers: List of handlers to customize retry logic in WebClient.
+ :param proxy: Proxy to make the Slack API call.
+ :param retry_handlers: List of handlers to customize retry logic in ``slack_sdk.WebClient``.
:param token: (deprecated) Slack API Token.
"""
diff --git a/airflow/providers/slack/hooks/slack_webhook.py b/airflow/providers/slack/hooks/slack_webhook.py
index af28f15c67..d6659da381 100644
--- a/airflow/providers/slack/hooks/slack_webhook.py
+++ b/airflow/providers/slack/hooks/slack_webhook.py
@@ -19,144 +19,454 @@ from __future__ import annotations
import json
import warnings
+from functools import wraps
+from typing import TYPE_CHECKING, Any, Callable
+from urllib.parse import urlparse
+from slack_sdk import WebhookClient
+
+from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException
-from airflow.providers.http.hooks.http import HttpHook
+from airflow.hooks.base import BaseHook
+from airflow.models import Connection
+from airflow.providers.slack.utils import ConnectionExtraConfig, prefixed_extra_field
+from airflow.utils.log.secrets_masker import mask_secret
+
+if TYPE_CHECKING:
+ from slack_sdk.http_retry import RetryHandler
+
+DEFAULT_SLACK_WEBHOOK_ENDPOINT = "https://hooks.slack.com/services"
+LEGACY_INTEGRATION_PARAMS = ("channel", "username", "icon_emoji", "icon_url")
+
+def check_webhook_response(func: Callable) -> Callable:
+ """Function decorator that check WebhookResponse and raise an error if status code != 200."""
-class SlackWebhookHook(HttpHook):
+ @wraps(func)
+ def wrapper(*args, **kwargs) -> Callable:
+ resp = func(*args, **kwargs)
+ if resp.status_code != 200:
+ raise AirflowException(
+ f"Response body: {resp.body!r}, Status Code: {resp.status_code}. "
+ "See: https://api.slack.com/messaging/webhooks#handling_errors"
+ )
+ return resp
+
+ return wrapper
+
+
+class SlackWebhookHook(BaseHook):
"""
- This hook allows you to post messages to Slack using incoming webhooks.
- Takes both Slack webhook token directly and connection that has Slack webhook token.
- If both supplied, http_conn_id will be used as base_url,
- and webhook_token will be taken as endpoint, the relative path of the url.
+ This class provide a thin wrapper around the ``slack_sdk.WebhookClient``.
+ This hook allows you to post messages to Slack by using Incoming Webhooks.
+
+ .. seealso::
+ - :ref:`Slack Incoming Webhook connection <howto/connection:slack-incoming-webhook>`
+ - https://api.slack.com/messaging/webhooks
+ - https://slack.dev/python-slack-sdk/webhook/index.html
+
+ .. note::
+ You cannot override the default channel (chosen by the user who installed your app),
+ username, or icon when you're using Incoming Webhooks to post messages.
+ Instead, these values will always inherit from the associated Slack App configuration
+ (`link <https://api.slack.com/messaging/webhooks#advanced_message_formatting>`_).
+ It is possible to change this values only in `Legacy Slack Integration Incoming Webhook
+ <https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations>`_.
.. warning::
- This hook intend to use `Slack Webhook` connection
+ This hook intend to use `Slack Incoming Webhook` connection
and might not work correctly with `Slack API` connection.
- Each Slack webhook token can be pre-configured to use a specific channel, username and
- icon. You can override these defaults in this hook.
-
- :param http_conn_id: connection that has Slack webhook token in the password field
- :param webhook_token: Slack webhook token
- :param message: The message you want to send on Slack
- :param attachments: The attachments to send on Slack. Should be a list of
- dictionaries representing Slack attachments.
- :param blocks: The blocks to send on Slack. Should be a list of
- dictionaries representing Slack blocks.
- :param channel: The channel the message should be posted to
- :param username: The username to post to slack with
- :param icon_emoji: The emoji to use as icon for the user posting to Slack
- :param icon_url: The icon image URL string to use in place of the default icon.
- :param link_names: Whether or not to find and link channel and usernames in your
- message
- :param proxy: Proxy to use to make the Slack webhook call
+ Examples:
+ .. code-block:: python
+
+ # Create hook
+ hook = SlackWebhookHook(slack_webhook_conn_id="slack_default")
+
+ # Post message in Slack channel by JSON formatted message
+ # See: https://api.slack.com/messaging/webhooks#posting_with_webhooks
+ hook.send_dict({"text": "Hello world!"})
+
+ # Post simple message in Slack channel
+ hook.send_text("Hello world!")
+
+ # Use ``slack_sdk.WebhookClient``
+ hook.client.send(text="Hello world!")
+
+ :param slack_webhook_conn_id: Slack Incoming Webhook connection id
+ that has Incoming Webhook token in the password field.
+ :param timeout: The maximum number of seconds the client will wait to connect
+ and receive a response from Slack. If not set than default WebhookClient value will use.
+ :param proxy: Proxy to make the Slack Incoming Webhook call.
+ :param retry_handlers: List of handlers to customize retry logic in ``slack_sdk.WebhookClient``.
+ :param webhook_token: (deprecated) Slack Incoming Webhook token.
+ Use instead Slack Incoming Webhook connection password field.
"""
- conn_name_attr = 'http_conn_id'
+ conn_name_attr = 'slack_webhook_conn_id'
default_conn_name = 'slack_default'
conn_type = 'slackwebhook'
- hook_name = 'Slack Webhook'
+ hook_name = 'Slack Incoming Webhook'
def __init__(
self,
- http_conn_id=None,
- webhook_token=None,
- message="",
- attachments=None,
- blocks=None,
- channel=None,
- username=None,
- icon_emoji=None,
- icon_url=None,
- link_names=False,
- proxy=None,
- *args,
+ slack_webhook_conn_id: str | None = None,
+ webhook_token: str | None = None,
+ timeout: int | None = None,
+ proxy: str | None = None,
+ retry_handlers: list[RetryHandler] | None = None,
**kwargs,
):
- super().__init__(http_conn_id=http_conn_id, *args, **kwargs)
- self.webhook_token = self._get_token(webhook_token, http_conn_id)
- self.message = message
- self.attachments = attachments
- self.blocks = blocks
- self.channel = channel
- self.username = username
- self.icon_emoji = icon_emoji
- self.icon_url = icon_url
- self.link_names = link_names
- self.proxy = proxy
+ super().__init__()
- def _get_token(self, token: str, http_conn_id: str | None) -> str:
- """
- Given either a manually set token or a conn_id, return the webhook_token to use.
+ http_conn_id = kwargs.pop("http_conn_id", None)
+ if http_conn_id:
+ warnings.warn(
+ 'Parameter `http_conn_id` is deprecated. Please use `slack_webhook_conn_id` instead.',
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ if slack_webhook_conn_id:
+ raise AirflowException("You cannot provide both `slack_webhook_conn_id` and `http_conn_id`.")
+ slack_webhook_conn_id = http_conn_id
- :param token: The manually provided token
- :param http_conn_id: The conn_id provided
- :return: webhook_token to use
- :rtype: str
- """
- if token:
- return token
- elif http_conn_id:
- conn = self.get_connection(http_conn_id)
+ if not slack_webhook_conn_id and not webhook_token:
+ raise AirflowException("Either `slack_webhook_conn_id` or `webhook_token` should be provided.")
+ if webhook_token:
+ mask_secret(webhook_token)
+ warnings.warn(
+ "Provide `webhook_token` as hook argument deprecated by security reason and will be removed "
+ "in a future releases. Please specify it in `Slack Webhook` connection.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ if not slack_webhook_conn_id:
+ warnings.warn(
+ "You have not set parameter `slack_webhook_conn_id`. Currently `Slack Incoming Webhook` "
+ "connection id optional but in a future release it will mandatory.",
+ FutureWarning,
+ stacklevel=2,
+ )
- if getattr(conn, 'password', None):
- return conn.password
- else:
- extra = conn.extra_dejson
- web_token = extra.get('webhook_token', '')
+ self.slack_webhook_conn_id = slack_webhook_conn_id
+ self.timeout = timeout
+ self.proxy = proxy
+ self.retry_handlers = retry_handlers
+ self._webhook_token = webhook_token
- if web_token:
+ # Compatibility with previous version of SlackWebhookHook
+ deprecated_class_attrs = []
+ for deprecated_attr in (
+ "message",
+ "attachments",
+ "blocks",
+ "channel",
+ "username",
+ "icon_emoji",
+ "icon_url",
+ "link_names",
+ ):
+ if deprecated_attr in kwargs:
+ deprecated_class_attrs.append(deprecated_attr)
+ setattr(self, deprecated_attr, kwargs.pop(deprecated_attr))
+ if deprecated_attr == "message":
+ # Slack WebHook Post Request not expected `message` as field,
+ # so we also set "text" attribute which will check by SlackWebhookHook._resolve_argument
+ self.text = getattr(self, deprecated_attr)
+ elif deprecated_attr == "link_names":
warnings.warn(
- "'webhook_token' in 'extra' is deprecated. Please use 'password' field",
- DeprecationWarning,
+ "`link_names` has no affect, if you want to mention user see: "
+ "https://api.slack.com/reference/surfaces/formatting#mentioning-users",
+ UserWarning,
stacklevel=2,
)
- return web_token
+ if deprecated_class_attrs:
+ warnings.warn(
+ f"Provide {','.join(repr(a) for a in deprecated_class_attrs)} as hook argument(s) "
+ f"is deprecated and will be removed in a future releases. "
+ f"Please specify attributes in `{self.__class__.__name__}.send` method instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+
+ self.extra_client_args = kwargs
+
+ @cached_property
+ def client(self) -> WebhookClient:
+ """Get the underlying slack_sdk.webhook.WebhookClient (cached)."""
+ return WebhookClient(**self._get_conn_params())
+
+ def get_conn(self) -> WebhookClient:
+ """Get the underlying slack_sdk.webhook.WebhookClient (cached)."""
+ return self.client
+
+ @cached_property
+ def webhook_token(self) -> str:
+ """Return Slack Webhook Token URL."""
+ warnings.warn(
+ "`SlackHook.webhook_token` property deprecated and will be removed in a future releases.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return self._get_conn_params()["url"]
+
+ def _get_conn_params(self) -> dict[str, Any]:
+ """Fetch connection params as a dict and merge it with hook parameters."""
+ default_schema, _, default_host = DEFAULT_SLACK_WEBHOOK_ENDPOINT.partition("://")
+ if self.slack_webhook_conn_id:
+ conn = self.get_connection(self.slack_webhook_conn_id)
else:
- raise AirflowException('Cannot get token: No valid Slack webhook token nor conn_id supplied')
+ # If slack_webhook_conn_id not specified, then use connection with default schema and host
+ conn = Connection(
+ conn_id=None, conn_type=self.conn_type, host=default_schema, password=default_host
+ )
+ extra_config = ConnectionExtraConfig(
+ conn_type=self.conn_type,
+ conn_id=conn.conn_id,
+ extra=conn.extra_dejson,
+ )
+ conn_params: dict[str, Any] = {"retry_handlers": self.retry_handlers}
+
+ webhook_token = None
+ if self._webhook_token:
+ self.log.debug("Retrieving Slack Webhook Token from hook attribute.")
+ webhook_token = self._webhook_token
+ elif conn.conn_id:
+ if conn.password:
+ self.log.debug(
+ "Retrieving Slack Webhook Token from Connection ID %r password.",
+ self.slack_webhook_conn_id,
+ )
+ webhook_token = conn.password
+
+ webhook_token = webhook_token or ""
+ if not webhook_token and not conn.host:
+ raise AirflowException("Cannot get token: No valid Slack token nor valid Connection ID supplied.")
+ elif webhook_token and "://" in webhook_token:
+ self.log.debug("Retrieving Slack Webhook Token URL from webhook token.")
+ url = webhook_token
+ else:
+ self.log.debug("Constructing Slack Webhook Token URL.")
+ if conn.host and "://" in conn.host:
+ base_url = conn.host
+ else:
+ schema = conn.schema if conn.schema else default_schema
+ host = conn.host if conn.host else default_host
+ base_url = f"{schema}://{host}"
+
+ base_url = base_url.rstrip("/")
+ if not webhook_token:
+ parsed_token = (urlparse(base_url).path or "").strip("/")
+ if base_url == DEFAULT_SLACK_WEBHOOK_ENDPOINT or not parsed_token:
+ # Raise an error in case of password not specified and
+ # 1. Result of constructing base_url equal https://hooks.slack.com/services
+ # 2. Empty url path, e.g. if base_url = https://hooks.slack.com
+ raise AirflowException(
+ "Cannot get token: No valid Slack token nor valid Connection ID supplied."
+ )
+ mask_secret(parsed_token)
+ warnings.warn(
+ f"Found Slack Webhook Token URL in Connection {conn.conn_id!r} `host` "
+ "and `password` field is empty. This behaviour deprecated "
+ "and could expose you token in the UI and will be removed in a future releases.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ url = (base_url.rstrip("/") + "/" + webhook_token.lstrip("/")).rstrip("/")
+
+ conn_params["url"] = url
+ # Merge Hook parameters with Connection config
+ conn_params.update(
+ {
+ "timeout": self.timeout or extra_config.getint("timeout", default=None),
+ "proxy": self.proxy or extra_config.get("proxy", default=None),
+ }
+ )
+ # Add additional client args
+ conn_params.update(self.extra_client_args)
+ if "logger" not in conn_params:
+ conn_params["logger"] = self.log
+
+ return {k: v for k, v in conn_params.items() if v is not None}
+
+ def _resolve_argument(self, name: str, value):
+ """
+ Resolve message parameters.
+
+ .. note::
+ This method exist for compatibility and merge instance class attributes with
+ method attributes and not be required when assign class attributes to message
+ would completely remove.
+ """
+ if value is None and name in (
+ "text",
+ "attachments",
+ "blocks",
+ "channel",
+ "username",
+ "icon_emoji",
+ "icon_url",
+ "link_names",
+ ):
+ return getattr(self, name, None)
- def _build_slack_message(self) -> str:
+ return value
+
+ @check_webhook_response
+ def send_dict(self, body: dict[str, Any] | str, *, headers: dict[str, str] | None = None):
"""
- Construct the Slack message. All relevant parameters are combined here to a valid
- Slack json message.
+ Performs a Slack Incoming Webhook request with given JSON data block.
- :return: Slack message to send
- :rtype: str
+ :param body: JSON data structure, expected dict or JSON-string.
+ :param headers: Request headers for this request.
"""
- cmd = {}
-
- if self.channel:
- cmd['channel'] = self.channel
- if self.username:
- cmd['username'] = self.username
- if self.icon_emoji:
- cmd['icon_emoji'] = self.icon_emoji
- if self.icon_url:
- cmd['icon_url'] = self.icon_url
- if self.link_names:
- cmd['link_names'] = 1
- if self.attachments:
- cmd['attachments'] = self.attachments
- if self.blocks:
- cmd['blocks'] = self.blocks
-
- cmd['text'] = self.message
- return json.dumps(cmd)
+ if isinstance(body, str):
+ try:
+ body = json.loads(body)
+ except json.JSONDecodeError as err:
+ raise AirflowException(
+ f"Body expected valid JSON string, got {body!r}. Original error:\n * {err}"
+ ) from None
+
+ if not isinstance(body, dict):
+ raise TypeError(f"Body expected dictionary, got {type(body).__name__}.")
+
+ if any(legacy_attr in body for legacy_attr in ("channel", "username", "icon_emoji", "icon_url")):
+ warnings.warn(
+ "You cannot override the default channel (chosen by the user who installed your app), "
+ "username, or icon when you're using Incoming Webhooks to post messages. "
+ "Instead, these values will always inherit from the associated Slack app configuration. "
+ "See: https://api.slack.com/messaging/webhooks#advanced_message_formatting. "
+ "It is possible to change this values only in Legacy Slack Integration Incoming Webhook: "
+ "https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations",
+ UserWarning,
+ stacklevel=2,
+ )
+
+ return self.client.send_dict(body, headers=headers)
+
+ def send(
+ self,
+ *,
+ text: str | None = None,
+ attachments: list[dict[str, Any]] | None = None,
+ blocks: list[dict[str, Any]] | None = None,
+ response_type: str | None = None,
+ replace_original: bool | None = None,
+ delete_original: bool | None = None,
+ unfurl_links: bool | None = None,
+ unfurl_media: bool | None = None,
+ headers: dict[str, str] | None = None,
+ **kwargs,
+ ):
+ """
+ Performs a Slack Incoming Webhook request with given arguments.
+
+ :param text: The text message
+ (even when having blocks, setting this as well is recommended as it works as fallback).
+ :param attachments: A collection of attachments.
+ :param blocks: A collection of Block Kit UI components.
+ :param response_type: The type of message (either 'in_channel' or 'ephemeral').
+ :param replace_original: True if you use this option for response_url requests.
+ :param delete_original: True if you use this option for response_url requests.
+ :param unfurl_links: Option to indicate whether text url should unfurl.
+ :param unfurl_media: Option to indicate whether media url should unfurl.
+ :param headers: Request headers for this request.
+ """
+ body = {
+ "text": self._resolve_argument("text", text),
+ "attachments": self._resolve_argument("attachments", attachments),
+ "blocks": self._resolve_argument("blocks", blocks),
+ "response_type": response_type,
+ "replace_original": replace_original,
+ "delete_original": delete_original,
+ "unfurl_links": unfurl_links,
+ "unfurl_media": unfurl_media,
+ # Legacy Integration Parameters
+ **{lip: self._resolve_argument(lip, kwargs.pop(lip, None)) for lip in LEGACY_INTEGRATION_PARAMS},
+ }
+ if kwargs:
+ warnings.warn(
+ f"Found unexpected keyword-argument(s) {', '.join(repr(k) for k in kwargs)} "
+ "in `send` method. This argument(s) have no effect.",
+ UserWarning,
+ stacklevel=2,
+ )
+ body = {k: v for k, v in body.items() if v is not None}
+ return self.send_dict(body=body, headers=headers)
+
+ def send_text(
+ self,
+ text: str,
+ *,
+ unfurl_links: bool | None = None,
+ unfurl_media: bool | None = None,
+ headers: dict[str, str] | None = None,
+ ):
+ """
+ Performs a Slack Incoming Webhook request with given text.
+
+ :param text: The text message.
+ :param unfurl_links: Option to indicate whether text url should unfurl.
+ :param unfurl_media: Option to indicate whether media url should unfurl.
+ :param headers: Request headers for this request.
+ """
+ return self.send(text=text, unfurl_links=unfurl_links, unfurl_media=unfurl_media, headers=headers)
+
+ @classmethod
+ def get_connection_form_widgets(cls) -> dict[str, Any]:
+ """Returns dictionary of widgets to be added for the hook to handle extra values."""
+ from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
+ from flask_babel import lazy_gettext
+ from wtforms import IntegerField, StringField
+
+ return {
+ prefixed_extra_field("timeout", cls.conn_type): IntegerField(
+ lazy_gettext("Timeout"),
+ widget=BS3TextFieldWidget(),
+ description="Optional. The maximum number of seconds the client will wait to connect "
+ "and receive a response from Slack Incoming Webhook.",
+ ),
+ prefixed_extra_field("proxy", cls.conn_type): StringField(
+ lazy_gettext('Proxy'),
+ widget=BS3TextFieldWidget(),
+ description="Optional. Proxy to make the Slack Incoming Webhook call.",
+ ),
+ }
+
+ @classmethod
+ def get_ui_field_behaviour(cls) -> dict[str, Any]:
+ """Returns custom field behaviour."""
+ return {
+ "hidden_fields": ["login", "port", "extra"],
+ "relabeling": {
+ "host": "Slack Webhook Endpoint",
+ "password": "Webhook Token",
+ },
+ "placeholders": {
+ "schema": "https",
+ "host": "hooks.slack.com/services",
+ "password": "T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
+ prefixed_extra_field("timeout", cls.conn_type): "30",
+ prefixed_extra_field("proxy", cls.conn_type): "http://localhost:9000",
+ },
+ }
def execute(self) -> None:
- """Remote Popen (actually execute the slack webhook call)"""
- proxies = {}
- if self.proxy:
- # we only need https proxy for Slack, as the endpoint is https
- proxies = {'https': self.proxy}
-
- slack_message = self._build_slack_message()
- self.run(
- endpoint=self.webhook_token,
- data=slack_message,
- headers={'Content-type': 'application/json'},
- extra_options={'proxies': proxies, 'check_response': True},
+ """
+ Remote Popen (actually execute the slack webhook call).
+
+ .. note::
+ This method exist for compatibility with previous version of operator
+ and expected that Slack Incoming Webhook message constructing from class attributes rather than
+ pass as method arguments.
+ """
+ warnings.warn(
+ "`SlackWebhookHook.execute` method deprecated and will be removed in a future releases. "
+ "Please use `SlackWebhookHook.send` or `SlackWebhookHook.send_dict` or "
+ "`SlackWebhookHook.send_text` methods instead.",
+ DeprecationWarning,
+ stacklevel=2,
)
+ self.send()
diff --git a/airflow/providers/slack/operators/slack_webhook.py b/airflow/providers/slack/operators/slack_webhook.py
index ebd51980dc..6872772a69 100644
--- a/airflow/providers/slack/operators/slack_webhook.py
+++ b/airflow/providers/slack/operators/slack_webhook.py
@@ -19,6 +19,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Sequence
+from airflow.compat.functools import cached_property
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
@@ -81,6 +82,7 @@ class SlackWebhookOperator(SimpleHttpOperator):
super().__init__(endpoint=webhook_token, **kwargs)
self.http_conn_id = http_conn_id
self.webhook_token = webhook_token
+ self.proxy = proxy
self.message = message
self.attachments = attachments
self.blocks = blocks
@@ -89,22 +91,27 @@ class SlackWebhookOperator(SimpleHttpOperator):
self.icon_emoji = icon_emoji
self.icon_url = icon_url
self.link_names = link_names
- self.proxy = proxy
- self.hook: SlackWebhookHook | None = None
+
+ @cached_property
+ def hook(self) -> SlackWebhookHook:
+ return SlackWebhookHook(
+ http_conn_id=self.http_conn_id,
+ webhook_token=self.webhook_token,
+ proxy=self.proxy,
+ )
def execute(self, context: Context) -> None:
"""Call the SlackWebhookHook to post the provided Slack message"""
- self.hook = SlackWebhookHook(
- self.http_conn_id,
- self.webhook_token,
- self.message,
- self.attachments,
- self.blocks,
- self.channel,
- self.username,
- self.icon_emoji,
- self.icon_url,
- self.link_names,
- self.proxy,
+ self.hook.send(
+ text=self.message,
+ attachments=self.attachments,
+ blocks=self.blocks,
+ # Parameters below use for compatibility with previous version of Operator and warn user if it set
+ # Legacy Integration Parameters
+ channel=self.channel,
+ username=self.username,
+ icon_emoji=self.icon_emoji,
+ icon_url=self.icon_url,
+ # Unused Parameters, if not None than warn user
+ link_names=self.link_names,
)
- self.hook.execute()
diff --git a/airflow/providers/slack/transfers/sql_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py
index 97d06dc565..2161899bf8 100644
--- a/airflow/providers/slack/transfers/sql_to_slack.py
+++ b/airflow/providers/slack/transfers/sql_to_slack.py
@@ -129,14 +129,11 @@ class SqlToSlackOperator(BaseOperator):
slack_hook = self._get_slack_hook()
self.log.info('Sending slack message: %s', self.slack_message)
- slack_hook.execute()
+ slack_hook.send(text=self.slack_message, channel=self.slack_channel)
def _get_slack_hook(self) -> SlackWebhookHook:
return SlackWebhookHook(
- http_conn_id=self.slack_conn_id,
- message=self.slack_message,
- channel=self.slack_channel,
- webhook_token=self.slack_webhook_token,
+ slack_webhook_conn_id=self.slack_conn_id, webhook_token=self.slack_webhook_token
)
def render_template_fields(self, context, jinja_env=None) -> None:
diff --git a/docs/apache-airflow-providers-slack/connections/slack-incoming-webhook.rst b/docs/apache-airflow-providers-slack/connections/slack-incoming-webhook.rst
new file mode 100644
index 0000000000..6017862721
--- /dev/null
+++ b/docs/apache-airflow-providers-slack/connections/slack-incoming-webhook.rst
@@ -0,0 +1,95 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+.. _howto/connection:slack-incoming-webhook:
+
+Slack Incoming Webhook Connection
+=================================
+
+The Slack Incoming Webhook connection type enables
+`Slack Incoming Webhooks <https://api.slack.com/messaging/webhooks>`_ Integrations.
+
+Authenticating to Slack
+-----------------------
+
+Authenticate to Slack using a `Incoming Webhook URL
+<https://api.slack.com/messaging/webhooks#getting_started>`_.
+
+Default Connection IDs
+----------------------
+
+.. warning::
+
+ The :class:`airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook` and community provided operators
+ not intend to use any Slack Incoming Webhook Connection by default right now.
+ It might change in the future to ``slack_default``.
+
+Configuring the Connection
+--------------------------
+
+Schema
+ Optional. Http schema, if not specified than **https** is used.
+
+Slack Webhook Endpoint (Host)
+ Optional. Reference to slack webhook endpoint, if not specified than **hooks.slack.com/services** is used.
+ In case if endpoint contain schema, than value from field ``Schema`` ignores.
+
+Webhook Token (Password)
+ Specify the Slack Incoming Webhook URL. It might specified as full url like
+ **https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX** in this case values
+ from ``Slack Webhook Endpoint (Host)`` and ``Schema`` fields ignores.
+ Or it might specified as URL path like **T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX** in this case
+ Slack Incoming Webhook URL will build from this field, ``Schema`` and ``Slack Webhook Endpoint (Host)``.
+
+Extra
+ Specify the extra parameters (as json dictionary) that can be used in
+ `slack_sdk.WebhookClient <https://slack.dev/python-slack-sdk/webhook/index.html>`_.
+ All parameters are optional.
+
+ * ``timeout``: The maximum number of seconds the client will wait to connect
+ and receive a response from Slack Incoming Webhook.
+ * ``proxy``: Proxy to make the Slack Incoming Webhook call.
+
+If you are configuring the connection via a URI, ensure that all components of the URI are URL-encoded.
+
+Examples
+--------
+
+**Snippet for create Connection as URI**:
+ .. code-block:: python
+
+ from airflow.models.connection import Connection
+
+ conn = Connection(
+ conn_id="slack_default",
+ conn_type="slackwebhook",
+ password="T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
+ extra={
+ # Specify extra parameters here
+ "timeout": "42",
+ },
+ )
+
+ # Generate Environment Variable Name
+ env_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}"
+ print(f"{env_key}='{conn.get_uri()}'")
+
+**Set Slack API Connection as Environment Variable (URI)**
+ .. code-block:: bash
+
+ export AIRFLOW_CONN_SLACK_DEFAULT='slackwebhook://:T00000000%2FB00000000%2FXXXXXXXXXXXXXXXXXXXXXXXX@/?timeout=42'
diff --git a/docs/apache-airflow-providers-slack/connections/slack.rst b/docs/apache-airflow-providers-slack/connections/slack.rst
index c07ea2191f..d4953fac1c 100644
--- a/docs/apache-airflow-providers-slack/connections/slack.rst
+++ b/docs/apache-airflow-providers-slack/connections/slack.rst
@@ -50,7 +50,7 @@ Extra (optional)
* ``timeout``: The maximum number of seconds the client will wait to connect and receive a response from Slack API.
* ``base_url``: A string representing the Slack API base URL.
- * ``proxy``: Proxy to make the Slack Incoming Webhook call.
+ * ``proxy``: Proxy to make the Slack API call.
If you are configuring the connection via a URI, ensure that all components of the URI are URL-encoded.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index fa37534f5c..06ec96fd35 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1566,6 +1566,7 @@ WebClient
webhdfs
Webhook
webhook
+WebhookClient
webhooks
webpage
webProperty
diff --git a/tests/providers/slack/hooks/test_slack_webhook.py b/tests/providers/slack/hooks/test_slack_webhook.py
index 8081e1d96e..d56ae15101 100644
--- a/tests/providers/slack/hooks/test_slack_webhook.py
+++ b/tests/providers/slack/hooks/test_slack_webhook.py
@@ -15,158 +15,555 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+#
+
from __future__ import annotations
import json
-import unittest
+from typing import Any
from unittest import mock
-from requests.exceptions import MissingSchema
-
-from airflow.models import Connection
-from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
-from airflow.utils import db
-
-
-class TestSlackWebhookHook(unittest.TestCase):
-
- _config = {
- 'http_conn_id': 'slack-webhook-default',
- 'webhook_token': 'manual_token',
- 'message': 'Awesome message to put on Slack',
- 'attachments': [{'fallback': 'Required plain-text summary'}],
- 'blocks': [{'type': 'section', 'text': {'type': 'mrkdwn', 'text': '*bold text*'}}],
- 'channel': '#general',
- 'username': 'SlackMcSlackFace',
- 'icon_emoji': ':hankey:',
- 'icon_url': 'https://airflow.apache.org/_images/pin_large.png',
- 'link_names': True,
- 'proxy': 'https://my-horrible-proxy.proxyist.com:8080',
- }
- expected_message_dict = {
- 'channel': _config['channel'],
- 'username': _config['username'],
- 'icon_emoji': _config['icon_emoji'],
- 'icon_url': _config['icon_url'],
- 'link_names': 1,
- 'attachments': _config['attachments'],
- 'blocks': _config['blocks'],
- 'text': _config['message'],
- }
- expected_message = json.dumps(expected_message_dict)
- expected_url = 'https://hooks.slack.com/services/T000/B000/XXX'
- expected_method = 'POST'
-
- def setUp(self):
- db.merge_conn(
- Connection(
- conn_id='slack-webhook-default',
- conn_type='slackwebhook',
- extra='{"webhook_token": "your_token_here"}',
- )
- )
- db.merge_conn(
- Connection(
- conn_id='slack-webhook-url',
- conn_type='slackwebhook',
- host='https://hooks.slack.com/services/T000/B000/XXX',
- )
+import pytest
+from slack_sdk.http_retry.builtin_handlers import ConnectionErrorRetryHandler, RateLimitErrorRetryHandler
+from slack_sdk.webhook.webhook_response import WebhookResponse
+
+from airflow.exceptions import AirflowException
+from airflow.models.connection import Connection
+from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook, check_webhook_response
+
+TEST_TOKEN = "T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
+TEST_WEBHOOK_URL = f"https://hooks.slack.com/services/{TEST_TOKEN}"
+TEST_CUSTOM_SCHEMA = "http"
+TEST_CUSTOM_ENDPOINT = "example.org/slack/webhooks"
+TEST_CUSTOM_WEBHOOK_URL = f"{TEST_CUSTOM_SCHEMA}://{TEST_CUSTOM_ENDPOINT}/{TEST_TOKEN}"
+TEST_CONN_ID = SlackWebhookHook.default_conn_name
+CONN_TYPE = "slackwebhook"
+TEST_CONN_ERROR_RETRY_HANDLER = ConnectionErrorRetryHandler(max_retry_count=42)
+TEST_RATE_LIMIT_RETRY_HANDLER = RateLimitErrorRetryHandler()
+MOCK_WEBHOOK_RESPONSE = WebhookResponse(url="foo://bar", status_code=200, body="ok", headers={})
+
+
+@pytest.fixture(scope="module", autouse=True)
+def slack_webhook_connections():
+ """Create tests connections."""
+ connections = [
+ Connection(
+ conn_id=SlackWebhookHook.default_conn_name,
+ conn_type=CONN_TYPE,
+ password=TEST_TOKEN,
+ ),
+ Connection(
+ conn_id="conn_full_url_connection",
+ conn_type=CONN_TYPE,
+ password=TEST_WEBHOOK_URL,
+ ),
+ Connection(
+ conn_id="conn_full_url_connection_with_host",
+ conn_type=CONN_TYPE,
+ host="http://example.org/hooks/",
+ password=TEST_WEBHOOK_URL,
+ ),
+ Connection(
+ conn_id="conn_host_with_schema",
+ conn_type=CONN_TYPE,
+ host="https://hooks.slack.com/services/",
+ password=f"/{TEST_TOKEN}",
+ ),
+ Connection(
+ conn_id="conn_host_without_schema",
+ conn_type=CONN_TYPE,
+ host="hooks.slack.com/services/",
+ password=f"/{TEST_TOKEN}",
+ ),
+ Connection(
+ conn_id="conn_parts",
+ conn_type=CONN_TYPE,
+ host="hooks.slack.com/services",
+ schema="https",
+ password=f"/{TEST_TOKEN}",
+ ),
+ Connection(
+ conn_id="conn_deprecated_extra",
+ conn_type=CONN_TYPE,
+ host="https://hooks.slack.com/services/",
+ extra={"webhook_token": TEST_TOKEN},
+ ),
+ Connection(conn_id="conn_token_in_host_1", conn_type=CONN_TYPE, host=TEST_WEBHOOK_URL),
+ Connection(
+ conn_id="conn_token_in_host_2",
+ conn_type=CONN_TYPE,
+ schema="https",
+ host="hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX",
+ ),
+ Connection(
+ conn_id="conn_custom_endpoint_1",
+ conn_type=CONN_TYPE,
+ schema=TEST_CUSTOM_SCHEMA,
+ host=TEST_CUSTOM_ENDPOINT,
+ password=TEST_TOKEN,
+ ),
+ Connection(
+ conn_id="conn_custom_endpoint_2",
+ conn_type=CONN_TYPE,
+ host=f"{TEST_CUSTOM_SCHEMA}://{TEST_CUSTOM_ENDPOINT}",
+ password=TEST_TOKEN,
+ ),
+ Connection(
+ conn_id="conn_custom_endpoint_3",
+ conn_type=CONN_TYPE,
+ password=TEST_CUSTOM_WEBHOOK_URL,
+ ),
+ Connection(
+ conn_id="conn_empty",
+ conn_type=CONN_TYPE,
+ ),
+ Connection(
+ conn_id="conn_password_empty_1",
+ conn_type=CONN_TYPE,
+ host="https://hooks.slack.com/services/",
+ ),
+ Connection(
+ conn_id="conn_password_empty_2",
+ conn_type=CONN_TYPE,
+ schema="http",
+ host="some.netloc",
+ ),
+ ]
+
+ conn_uris = {f"AIRFLOW_CONN_{c.conn_id.upper()}": c.get_uri() for c in connections}
+
+ with mock.patch.dict("os.environ", values=conn_uris):
+ yield
+
+
+class TestCheckWebhookResponseDecorator:
+ def test_ok_response(self):
+ """Test OK response."""
+
+ @check_webhook_response
+ def decorated():
+ return MOCK_WEBHOOK_RESPONSE
+
+ assert decorated() is MOCK_WEBHOOK_RESPONSE
+
+ @pytest.mark.parametrize(
+ "status_code,body",
+ [
+ (400, "invalid_payload"),
+ (403, "action_prohibited"),
+ (404, "channel_not_found"),
+ (410, "channel_is_archived"),
+ (500, "rollup_error"),
+ (418, "i_am_teapot"),
+ ],
+ )
+ def test_error_response(self, status_code, body):
+ """Test error response."""
+ test_response = WebhookResponse(url="foo://bar", status_code=status_code, body=body, headers={})
+
+ @check_webhook_response
+ def decorated():
+ return test_response
+
+ error_message = fr"Response body: '{body}', Status Code: {status_code}\."
+ with pytest.raises(AirflowException, match=error_message):
+ assert decorated()
+
+
+class TestSlackWebhookHook:
+ def test_no_credentials(self):
+ """Test missing credentials."""
+ error_message = r"Either `slack_webhook_conn_id` or `webhook_token` should be provided\."
+ with pytest.raises(AirflowException, match=error_message):
+ SlackWebhookHook(slack_webhook_conn_id=None, webhook_token=None)
+
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.mask_secret")
+ def test_webhook_token(self, mock_mask_secret):
+ webhook_token = "test-value"
+ warning_message = (
+ r"Provide `webhook_token` as hook argument deprecated by security reason and will be removed "
+ r"in a future releases. Please specify it in `Slack Webhook` connection\."
)
- db.merge_conn(
- Connection(
- conn_id='slack-webhook-host',
- conn_type='slackwebhook',
- host='https://hooks.slack.com/services/T000/',
+ with pytest.warns(DeprecationWarning, match=warning_message):
+ SlackWebhookHook(webhook_token=webhook_token)
+ mock_mask_secret.assert_called_once_with(webhook_token)
+
+ def test_conn_id(self):
+ """Different conn_id arguments and options."""
+ hook = SlackWebhookHook(slack_webhook_conn_id=SlackWebhookHook.default_conn_name, http_conn_id=None)
+ assert hook.slack_webhook_conn_id == SlackWebhookHook.default_conn_name
+ assert not hasattr(hook, "http_conn_id")
+
+ hook = SlackWebhookHook(slack_webhook_conn_id=None, http_conn_id=SlackWebhookHook.default_conn_name)
+ assert hook.slack_webhook_conn_id == SlackWebhookHook.default_conn_name
+ assert not hasattr(hook, "http_conn_id")
+
+ error_message = "You cannot provide both `slack_webhook_conn_id` and `http_conn_id`."
+ with pytest.raises(AirflowException, match=error_message):
+ SlackWebhookHook(
+ slack_webhook_conn_id=SlackWebhookHook.default_conn_name,
+ http_conn_id=SlackWebhookHook.default_conn_name,
)
+
+ @pytest.mark.parametrize(
+ "conn_id",
+ [
+ TEST_CONN_ID,
+ "conn_full_url_connection",
+ "conn_full_url_connection_with_host",
+ "conn_host_with_schema",
+ "conn_host_without_schema",
+ "conn_parts",
+ "conn_token_in_host_1",
+ "conn_token_in_host_2",
+ ],
+ )
+ def test_construct_webhook_url(self, conn_id):
+ """Test valid connections."""
+ hook = SlackWebhookHook(slack_webhook_conn_id=conn_id)
+ conn_params = hook._get_conn_params()
+ assert "url" in conn_params
+ assert conn_params["url"] == TEST_WEBHOOK_URL
+
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.mask_secret")
+ @pytest.mark.parametrize("conn_id", ["conn_token_in_host_1", "conn_token_in_host_2"])
+ def test_construct_webhook_url_deprecated_full_url_in_host(self, mock_mask_secret, conn_id):
+ """Test deprecated option with full URL in host/schema and empty password."""
+ hook = SlackWebhookHook(slack_webhook_conn_id=conn_id)
+ warning_message = (
+ r"Found Slack Webhook Token URL in Connection .* `host` and `password` field is empty\."
)
- db.merge_conn(
- Connection(
- conn_id='slack-webhook-with-password',
- conn_type='slackwebhook',
- password='your_token_here',
- )
+ with pytest.warns(DeprecationWarning, match=warning_message):
+ conn_params = hook._get_conn_params()
+ mock_mask_secret.assert_called_once_with(mock.ANY)
+ assert "url" in conn_params
+ assert conn_params["url"] == TEST_WEBHOOK_URL
+
+ @pytest.mark.parametrize(
+ "conn_id", ["conn_custom_endpoint_1", "conn_custom_endpoint_2", "conn_custom_endpoint_3"]
+ )
+ def test_construct_webhook_url_with_non_default_host(self, conn_id):
+ """Test valid connections with endpoint != https://hooks.slack.com/hooks."""
+ hook = SlackWebhookHook(slack_webhook_conn_id=conn_id)
+ conn_params = hook._get_conn_params()
+ assert "url" in conn_params
+ assert conn_params["url"] == TEST_CUSTOM_WEBHOOK_URL
+
+ @pytest.mark.parametrize(
+ "conn_id",
+ [
+ "conn_empty",
+ "conn_password_empty_1",
+ "conn_password_empty_2",
+ ],
+ )
+ def test_no_password_in_connection_field(self, conn_id):
+ """Test connection which missing password field in connection."""
+ hook = SlackWebhookHook(slack_webhook_conn_id=conn_id)
+ error_message = r"Cannot get token\: No valid Slack token nor valid Connection ID supplied\."
+ with pytest.raises(AirflowException, match=error_message):
+ hook._get_conn_params()
+
+ @pytest.mark.parametrize("conn_id", [None, "conn_empty"])
+ @pytest.mark.parametrize("token", [TEST_TOKEN, TEST_WEBHOOK_URL, f"/{TEST_TOKEN}"])
+ def test_empty_connection_field_with_token(self, conn_id, token):
+ """Test connections which is empty or not set and valid webhook_token specified."""
+ hook = SlackWebhookHook(slack_webhook_conn_id="conn_empty", webhook_token=token)
+ conn_params = hook._get_conn_params()
+ assert "url" in conn_params
+ assert conn_params["url"] == TEST_WEBHOOK_URL
+
+ @pytest.mark.parametrize(
+ "hook_config,conn_extra,expected",
+ [
+ ( # Test Case: hook config
+ {
+ "timeout": 42,
+ "proxy": "https://hook-proxy:1234",
+ "retry_handlers": [TEST_CONN_ERROR_RETRY_HANDLER, TEST_RATE_LIMIT_RETRY_HANDLER],
+ },
+ {},
+ {
+ "timeout": 42,
+ "proxy": "https://hook-proxy:1234",
+ "retry_handlers": [TEST_CONN_ERROR_RETRY_HANDLER, TEST_RATE_LIMIT_RETRY_HANDLER],
+ },
+ ),
+ ( # Test Case: connection config
+ {},
+ {
+ "timeout": 9000,
+ "proxy": "https://conn-proxy:4321",
+ },
+ {
+ "timeout": 9000,
+ "proxy": "https://conn-proxy:4321",
+ },
+ ),
+ ( # Test Case: Connection from the UI
+ {},
+ {
+ "extra__slackwebhook__timeout": 9000,
+ "extra__slackwebhook__proxy": "https://conn-proxy:4321",
+ },
+ {
+ "timeout": 9000,
+ "proxy": "https://conn-proxy:4321",
+ },
+ ),
+ ( # Test Case: Merge configs - hook args overwrite conn config
+ {
+ "timeout": 1,
+ "proxy": "https://hook-proxy:777",
+ },
+ {
+ "timeout": 9000,
+ "proxy": "https://conn-proxy:4321",
+ },
+ {
+ "timeout": 1,
+ "proxy": "https://hook-proxy:777",
+ },
+ ),
+ ( # Test Case: Merge configs - resolve config
+ {
+ "timeout": 1,
+ },
+ {
+ "timeout": 9000,
+ "proxy": "https://conn-proxy:4334",
+ },
+ {
+ "timeout": 1,
+ "proxy": "https://conn-proxy:4334",
+ },
+ ),
+ ( # Test Case: empty configs
+ {},
+ {},
+ {},
+ ),
+ ( # Test Case: extra_client_args
+ {"foo": "bar"},
+ {},
+ {"foo": "bar"},
+ ),
+ ( # Test Case: ignored not expected connection extra
+ {},
+ {"spam": "egg"},
+ {},
+ ),
+ ],
+ )
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient")
+ def test_client_configuration(
+ self, mock_webhook_client_cls, hook_config, conn_extra, expected: dict[str, Any]
+ ):
+ """Test read/parse/merge WebhookClient config from connection and hook arguments."""
+ expected["url"] = TEST_WEBHOOK_URL
+ test_conn = Connection(
+ conn_id="test-slack-incoming-webhook-conn",
+ conn_type=CONN_TYPE,
+ password=TEST_WEBHOOK_URL,
+ extra=conn_extra,
)
+ test_conn_env = f"AIRFLOW_CONN_{test_conn.conn_id.upper()}"
+ mock_webhook_client = mock_webhook_client_cls.return_value
- def test_get_token_manual_token(self):
- # Given
- manual_token = 'manual_token_here'
- hook = SlackWebhookHook(webhook_token=manual_token)
+ with mock.patch.dict("os.environ", values={test_conn_env: test_conn.get_uri()}):
+ hook = SlackWebhookHook(slack_webhook_conn_id=test_conn.conn_id, **hook_config)
+ expected["logger"] = hook.log
+ conn_params = hook._get_conn_params()
+ assert conn_params == expected
- # When
- webhook_token = hook._get_token(manual_token, None)
+ client = hook.client
+ assert client == mock_webhook_client
+ assert hook.get_conn() == mock_webhook_client
+ assert hook.get_conn() is client # cached
+ mock_webhook_client_cls.assert_called_once_with(**expected)
- # Then
- assert webhook_token == manual_token
+ @pytest.mark.parametrize("headers", [None, {"User-Agent": "Airflow"}])
+ @pytest.mark.parametrize(
+ "send_body",
+ [
+ {"text": "Test Text"},
+ {"text": "Fallback Text", "blocks": ["Dummy Block"]},
+ {"text": "Fallback Text", "blocks": ["Dummy Block"], "unfurl_media": True, "unfurl_links": True},
+ ],
+ )
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient")
+ def test_hook_send_dict(self, mock_webhook_client_cls, send_body, headers):
+ """Test `SlackWebhookHook.send_dict` method."""
+ mock_webhook_client = mock_webhook_client_cls.return_value
+ mock_webhook_client_send_dict = mock_webhook_client.send_dict
+ mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE
- def test_get_token_conn_id(self):
- # Given
- conn_id = 'slack-webhook-default'
- hook = SlackWebhookHook(http_conn_id=conn_id)
- expected_webhook_token = 'your_token_here'
+ hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID)
+ # Test with regular dictionary
+ hook.send_dict(body=send_body, headers=headers)
+ mock_webhook_client_send_dict.assert_called_once_with(send_body, headers=headers)
- # When
- webhook_token = hook._get_token(None, conn_id)
+ # Test with JSON-string
+ mock_webhook_client_send_dict.reset_mock()
+ hook.send_dict(body=json.dumps(send_body), headers=headers)
+ mock_webhook_client_send_dict.assert_called_once_with(send_body, headers=headers)
- # Then
- assert webhook_token == expected_webhook_token
+ @pytest.mark.parametrize("send_body", [("text", "Test Text"), 42, "null", "42"])
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient")
+ def test_hook_send_dict_invalid_type(self, mock_webhook_client_cls, send_body):
+ """Test invalid body type for `SlackWebhookHook.send_dict` method."""
+ mock_webhook_client = mock_webhook_client_cls.return_value
+ mock_webhook_client_send_dict = mock_webhook_client.send_dict
+ mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE
- def test_get_token_conn_id_password(self):
- # Given
- conn_id = 'slack-webhook-with-password'
- hook = SlackWebhookHook(http_conn_id=conn_id)
- expected_webhook_token = 'your_token_here'
+ hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID)
+ with pytest.raises(TypeError, match=r"Body expected dictionary, got .*\."):
+ hook.send_dict(body=send_body)
+ assert mock_webhook_client_send_dict.assert_not_called
- # When
- webhook_token = hook._get_token(None, conn_id)
+ @pytest.mark.parametrize("json_string", ["{'text': 'Single quotes'}", '{"text": "Missing }"'])
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient")
+ def test_hook_send_dict_invalid_json_string(self, mock_webhook_client_cls, json_string):
+ """Test invalid JSON-string passed to `SlackWebhookHook.send_dict` method."""
+ mock_webhook_client = mock_webhook_client_cls.return_value
+ mock_webhook_client_send_dict = mock_webhook_client.send_dict
+ mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE
- # Then
- assert webhook_token == expected_webhook_token
+ hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID)
+ error_message = r"Body expected valid JSON string, got .*\. Original error:.*"
+ with pytest.raises(AirflowException, match=error_message):
+ hook.send_dict(body=json_string)
+ assert mock_webhook_client_send_dict.assert_not_called
- def test_build_slack_message(self):
- # Given
- hook = SlackWebhookHook(**self._config)
+ @pytest.mark.parametrize(
+ "legacy_attr",
+ [
+ "channel",
+ "username",
+ "icon_emoji",
+ "icon_url",
+ ],
+ )
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient")
+ def test_hook_send_dict_legacy_slack_integration(self, mock_webhook_client_cls, legacy_attr):
+ """Test `SlackWebhookHook.send_dict` warn users about Legacy Slack Integrations."""
+ mock_webhook_client = mock_webhook_client_cls.return_value
+ mock_webhook_client_send_dict = mock_webhook_client.send_dict
+ mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE
- # When
- message = hook._build_slack_message()
+ legacy_slack_integration_body = {legacy_attr: "test-value"}
+ hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID)
+ warning_message = (
+ r"You cannot override the default channel \(chosen by the user who installed your app\), "
+ r"username, or icon when you're using Incoming Webhooks to post messages\. "
+ r"Instead, these values will always inherit from the associated Slack app configuration\. "
+ r"See: .*\. It is possible to change this values only in "
+ r"Legacy Slack Integration Incoming Webhook: .*"
+ )
+ with pytest.warns(UserWarning, match=warning_message):
+ hook.send_dict(body=legacy_slack_integration_body)
+ mock_webhook_client_send_dict.assert_called_once_with(legacy_slack_integration_body, headers=None)
- # Then
- assert self.expected_message_dict == json.loads(message)
+ @pytest.mark.parametrize("headers", [None, {"User-Agent": "Airflow"}])
+ @pytest.mark.parametrize(
+ "send_params",
+ [
+ {"text": "Test Text"},
+ {"text": "Fallback Text", "blocks": ["Dummy Block"]},
+ {"text": "Fallback Text", "blocks": ["Dummy Block"], "unfurl_media": True, "unfurl_links": True},
+ ],
+ )
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook.send_dict")
+ def test_hook_send(self, mock_hook_send_dict, send_params, headers):
+ """Test `SlackWebhookHook.send` method."""
+ hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID)
+ hook.send(**send_params, headers=headers)
+ mock_hook_send_dict.assert_called_once_with(body=send_params, headers=headers)
- @mock.patch('requests.Session')
- @mock.patch('requests.Request')
- def test_url_generated_by_http_conn_id(self, mock_request, mock_session):
- hook = SlackWebhookHook(http_conn_id='slack-webhook-url')
- try:
- hook.execute()
- except MissingSchema:
- pass
- mock_request.assert_called_once_with(
- self.expected_method, self.expected_url, headers=mock.ANY, data=mock.ANY
+ @pytest.mark.parametrize(
+ "deprecated_hook_attr",
+ [
+ "message",
+ "attachments",
+ "blocks",
+ "channel",
+ "username",
+ "icon_emoji",
+ "icon_url",
+ ],
+ )
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook.send_dict")
+ def test_hook_send_by_hook_attributes(self, mock_hook_send_dict, deprecated_hook_attr):
+ """Test `SlackWebhookHook.send` with parameters set in hook attributes."""
+ send_params = {deprecated_hook_attr: "test-value"}
+ expected_body = {deprecated_hook_attr if deprecated_hook_attr != "message" else "text": "test-value"}
+ warning_message = (
+ r"Provide .* as hook argument\(s\) is deprecated and will be removed in a future releases\. "
+ r"Please specify attributes in `SlackWebhookHook\.send` method instead\."
)
- mock_request.reset_mock()
+ with pytest.warns(DeprecationWarning, match=warning_message):
+ hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID, **send_params)
+ assert getattr(hook, deprecated_hook_attr) == "test-value"
+ if deprecated_hook_attr == "message":
+ assert getattr(hook, "text") == "test-value"
+ # Test ``.send()`` method
+ hook.send()
+ mock_hook_send_dict.assert_called_once_with(body=expected_body, headers=None)
- @mock.patch('requests.Session')
- @mock.patch('requests.Request')
- def test_url_generated_by_endpoint(self, mock_request, mock_session):
- hook = SlackWebhookHook(webhook_token=self.expected_url)
- try:
+ # Test deprecated ``.execute()`` method
+ mock_hook_send_dict.reset_mock()
+ warning_message = (
+ "`SlackWebhookHook.execute` method deprecated and will be removed in a future releases. "
+ "Please use `SlackWebhookHook.send` or `SlackWebhookHook.send_dict` or "
+ "`SlackWebhookHook.send_text` methods instead."
+ )
+ with pytest.warns(DeprecationWarning, match=warning_message):
hook.execute()
- except MissingSchema:
- pass
- mock_request.assert_called_once_with(
- self.expected_method, self.expected_url, headers=mock.ANY, data=mock.ANY
+ mock_hook_send_dict.assert_called_once_with(body=expected_body, headers=None)
+
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient")
+ def test_hook_ignored_attributes(self, mock_webhook_client_cls, recwarn):
+ """Test hook constructor warn users about ignored attributes."""
+ mock_webhook_client = mock_webhook_client_cls.return_value
+ mock_webhook_client_send_dict = mock_webhook_client.send_dict
+ mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE
+
+ hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID, link_names="test-value")
+ assert len(recwarn) == 2
+ assert str(recwarn.pop(UserWarning).message).startswith(
+ "`link_names` has no affect, if you want to mention user see:"
+ )
+ assert str(recwarn.pop(DeprecationWarning).message).startswith(
+ "Provide 'link_names' as hook argument(s) is deprecated and will be removed in a future releases."
)
- mock_request.reset_mock()
+ hook.send()
+ mock_webhook_client_send_dict.assert_called_once_with({}, headers=None)
- @mock.patch('requests.Session')
- @mock.patch('requests.Request')
- def test_url_generated_by_http_conn_id_and_endpoint(self, mock_request, mock_session):
- hook = SlackWebhookHook(http_conn_id='slack-webhook-host', webhook_token='B000/XXX')
- try:
- hook.execute()
- except MissingSchema:
- pass
- mock_request.assert_called_once_with(
- self.expected_method, self.expected_url, headers=mock.ANY, data=mock.ANY
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.WebhookClient")
+ def test_hook_send_unexpected_arguments(self, mock_webhook_client_cls, recwarn):
+ """Test `SlackWebhookHook.send` unexpected attributes."""
+ mock_webhook_client = mock_webhook_client_cls.return_value
+ mock_webhook_client_send_dict = mock_webhook_client.send_dict
+ mock_webhook_client_send_dict.return_value = MOCK_WEBHOOK_RESPONSE
+
+ hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID)
+ warning_message = (
+ r"Found unexpected keyword-argument\(s\) 'link_names', 'as_user' "
+ r"in `send` method\. This argument\(s\) have no effect\."
+ )
+ with pytest.warns(UserWarning, match=warning_message):
+ hook.send(link_names="foo-bar", as_user="root", text="Awesome!")
+
+ mock_webhook_client_send_dict.assert_called_once_with({"text": "Awesome!"}, headers=None)
+
+ @pytest.mark.parametrize("headers", [None, {"User-Agent": "Airflow"}])
+ @pytest.mark.parametrize("unfurl_links", [None, False, True])
+ @pytest.mark.parametrize("unfurl_media", [None, False, True])
+ @mock.patch("airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook.send")
+ def test_hook_send_text(self, mock_hook_send, headers, unfurl_links, unfurl_media):
+ """Test `SlackWebhookHook.send_text` method."""
+ hook = SlackWebhookHook(slack_webhook_conn_id=TEST_CONN_ID)
+ hook.send_text("Test Text", headers=headers, unfurl_links=unfurl_links, unfurl_media=unfurl_media)
+ mock_hook_send.assert_called_once_with(
+ text="Test Text", headers=headers, unfurl_links=unfurl_links, unfurl_media=unfurl_media
)
- mock_request.reset_mock()
diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py
index 5df4df40a4..b0beed1557 100644
--- a/tests/providers/slack/transfers/test_sql_to_slack.py
+++ b/tests/providers/slack/transfers/test_sql_to_slack.py
@@ -21,7 +21,7 @@ from unittest import mock
import pandas as pd
import pytest
-from airflow import AirflowException
+from airflow.exceptions import AirflowException
from airflow.models import DAG, Connection
from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
from airflow.utils import timezone
@@ -64,14 +64,15 @@ class TestSqlToSlackOperator:
# Test that the Slack hook is instantiated with the right parameters
mock_slack_hook_class.assert_called_once_with(
- http_conn_id='slack_connection',
- message=f'message: 2017-01-01, {test_df}',
- channel='#test',
+ slack_webhook_conn_id='slack_connection',
webhook_token=None,
)
- # Test that the Slack hook's execute method gets run once
- slack_webhook_hook.execute.assert_called_once()
+ # Test that the `SlackWebhookHook.send` method gets run once
+ slack_webhook_hook.send.assert_called_once_with(
+ text=f'message: 2017-01-01, {test_df}',
+ channel='#test',
+ )
@mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
def test_rendering_and_message_execution_with_slack_hook(self, mock_slack_hook_class):
@@ -98,14 +99,15 @@ class TestSqlToSlackOperator:
# Test that the Slack hook is instantiated with the right parameters
mock_slack_hook_class.assert_called_once_with(
- http_conn_id='slack_connection',
- message=f'message: 2017-01-01, {test_df}',
- channel='#test',
+ slack_webhook_conn_id='slack_connection',
webhook_token='test_token',
)
- # Test that the Slack hook's execute method gets run once
- slack_webhook_hook.execute.assert_called_once()
+ # Test that the `SlackWebhookHook.send` method gets run once
+ slack_webhook_hook.send.assert_called_once_with(
+ text=f'message: 2017-01-01, {test_df}',
+ channel='#test',
+ )
def test_non_existing_slack_parameters_provided_exception_thrown(self):
operator_args = {
@@ -141,14 +143,15 @@ class TestSqlToSlackOperator:
# Test that the Slack hook is instantiated with the right parameters
mock_slack_hook_class.assert_called_once_with(
- http_conn_id='slack_connection',
- message=f'message: 2017-01-01, {test_df}',
- channel='#test',
+ slack_webhook_conn_id='slack_connection',
webhook_token=None,
)
- # Test that the Slack hook's execute method gets run once
- slack_webhook_hook.execute.assert_called_once()
+ # Test that the `SlackWebhookHook.send` method gets run once
+ slack_webhook_hook.send.assert_called_once_with(
+ text=f'message: 2017-01-01, {test_df}',
+ channel='#test',
+ )
@mock.patch('airflow.providers.common.sql.operators.sql.BaseHook.get_connection')
def test_hook_params_building(self, mock_get_conn):
diff --git a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py
index 7dffa8a8f9..bbc02abbde 100644
--- a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py
+++ b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py
@@ -46,6 +46,7 @@ class TestSnowflakeToSlackOperator:
@mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
def test_hooks_and_rendering(self, mock_slack_hook_class):
+ slack_webhook_hook = mock_slack_hook_class.return_value
operator_args = {
'snowflake_conn_id': 'snowflake_connection',
'sql': "sql {{ ds }}",
@@ -71,10 +72,14 @@ class TestSnowflakeToSlackOperator:
# Test that the Slack hook is instantiated with the right parameters
mock_slack_hook_class.assert_called_once_with(
- message='message: 2017-01-01, 1234',
+ slack_webhook_conn_id='slack_default',
webhook_token='test_token',
+ )
+
+ # Test that the `SlackWebhookHook.send` method gets run once
+ slack_webhook_hook.send.assert_called_once_with(
+ text='message: 2017-01-01, 1234',
channel=None,
- http_conn_id='slack_default',
)
def test_hook_params_building(self):