You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/06/29 12:28:22 UTC

[airflow] branch main updated: Adding generic `SqlToSlackOperator` (#24663)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 13908c2c91 Adding generic `SqlToSlackOperator` (#24663)
13908c2c91 is described below

commit 13908c2c914cf08f9d962a4d3b6ae54fbdf1d223
Author: Alex Kruchkov <36...@users.noreply.github.com>
AuthorDate: Wed Jun 29 15:28:07 2022 +0300

    Adding generic `SqlToSlackOperator` (#24663)
    
    * adding `SqlToSlackOperator`
    
    Co-authored-by: eladkal <45...@users.noreply.github.com>
---
 .../providers/presto/transfers/presto_to_slack.py  |  87 ++-------
 airflow/providers/slack/provider.yaml              |   6 +
 airflow/providers/slack/transfers/__init__.py      |  16 ++
 .../transfers/sql_to_slack.py}                     | 122 ++++++++----
 .../snowflake/transfers/snowflake_to_slack.py      | 103 +++-------
 docs/apache-airflow-providers-slack/index.rst      |  10 +
 .../operators/sql_to_slack.rst                     |  38 ++++
 .../pre_commit_check_2_2_compatibility.py          |   5 +-
 .../presto/transfers/test_presto_to_slack.py       |  48 ++++-
 tests/providers/slack/transfers/__init__.py        |  17 ++
 .../providers/slack/transfers/test_sql_to_slack.py | 213 +++++++++++++++++++++
 .../snowflake/transfers/test_snowflake_to_slack.py |  96 +++++++---
 .../providers/presto/example_presto_to_slack.py    |   2 +
 tests/system/providers/slack/__init__.py           |  16 ++
 .../example_sql_to_slack.py}                       |  19 +-
 15 files changed, 575 insertions(+), 223 deletions(-)

diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py
index 6dd0ecb3ab..bcf2c3177a 100644
--- a/airflow/providers/presto/transfers/presto_to_slack.py
+++ b/airflow/providers/presto/transfers/presto_to_slack.py
@@ -15,21 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union
+import warnings
+from typing import Iterable, Mapping, Optional, Sequence, Union
 
-from pandas import DataFrame
-from tabulate import tabulate
+from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
 
-from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
-from airflow.providers.presto.hooks.presto import PrestoHook
-from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
 
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
-
-class PrestoToSlackOperator(BaseOperator):
+class PrestoToSlackOperator(SqlToSlackOperator):
     """
     Executes a single SQL statement in Presto and sends the results to Slack. The results of the query are
     rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
@@ -73,8 +65,6 @@ class PrestoToSlackOperator(BaseOperator):
         slack_channel: Optional[str] = None,
         **kwargs,
     ) -> None:
-        super().__init__(**kwargs)
-
         self.presto_conn_id = presto_conn_id
         self.sql = sql
         self.parameters = parameters
@@ -84,58 +74,23 @@ class PrestoToSlackOperator(BaseOperator):
         self.results_df_name = results_df_name
         self.slack_channel = slack_channel
 
-    def _get_query_results(self) -> DataFrame:
-        presto_hook = self._get_presto_hook()
-
-        self.log.info('Running SQL query: %s', self.sql)
-        df = presto_hook.get_pandas_df(self.sql, parameters=self.parameters)
-        return df
-
-    def _render_and_send_slack_message(self, context, df) -> None:
-        # Put the dataframe into the context and render the JINJA template fields
-        context[self.results_df_name] = df
-        self.render_template_fields(context)
-
-        slack_hook = self._get_slack_hook()
-        self.log.info('Sending slack message: %s', self.slack_message)
-        slack_hook.execute()
-
-    def _get_presto_hook(self) -> PrestoHook:
-        return PrestoHook(presto_conn_id=self.presto_conn_id)
+        warnings.warn(
+            """
+            PrestoToSlackOperator is deprecated.
+            Please use `airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator`.
+            """,
+            DeprecationWarning,
+            stacklevel=2,
+        )
 
-    def _get_slack_hook(self) -> SlackWebhookHook:
-        return SlackWebhookHook(
-            http_conn_id=self.slack_conn_id,
-            message=self.slack_message,
-            webhook_token=self.slack_token,
+        super().__init__(
+            sql=self.sql,
+            sql_conn_id=self.presto_conn_id,
+            slack_conn_id=self.slack_conn_id,
+            slack_webhook_token=self.slack_token,
+            slack_message=self.slack_message,
             slack_channel=self.slack_channel,
+            results_df_name=self.results_df_name,
+            parameters=self.parameters,
+            **kwargs,
         )
-
-    def render_template_fields(self, context, jinja_env=None) -> None:
-        # If this is the first render of the template fields, exclude slack_message from rendering since
-        # the presto results haven't been retrieved yet.
-        if self.times_rendered == 0:
-            fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields)
-        else:
-            fields_to_render = self.template_fields
-
-        if not jinja_env:
-            jinja_env = self.get_template_env()
-
-        # Add the tabulate library into the JINJA environment
-        jinja_env.filters['tabulate'] = tabulate
-
-        self._do_render_template_fields(self, fields_to_render, context, jinja_env, set())
-        self.times_rendered += 1
-
-    def execute(self, context: 'Context') -> None:
-        if not self.sql.strip():
-            raise AirflowException("Expected 'sql' parameter is missing.")
-        if not self.slack_message.strip():
-            raise AirflowException("Expected 'slack_message' parameter is missing.")
-
-        df = self._get_query_results()
-
-        self._render_and_send_slack_message(context, df)
-
-        self.log.debug('Finished sending Presto data to Slack')
diff --git a/airflow/providers/slack/provider.yaml b/airflow/providers/slack/provider.yaml
index f54d70710c..cfd1ef89b7 100644
--- a/airflow/providers/slack/provider.yaml
+++ b/airflow/providers/slack/provider.yaml
@@ -57,6 +57,12 @@ hooks:
       - airflow.providers.slack.hooks.slack
       - airflow.providers.slack.hooks.slack_webhook
 
+transfers:
+  - source-integration-name: SQL
+    target-integration-name: Slack
+    python-module: airflow.providers.slack.transfers.sql_to_slack
+    how-to-guide: /docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
+
 connection-types:
   - hook-class-name: airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook
     connection-type: slackwebhook
diff --git a/airflow/providers/slack/transfers/__init__.py b/airflow/providers/slack/transfers/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/slack/transfers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py
similarity index 54%
copy from airflow/providers/snowflake/transfers/snowflake_to_slack.py
copy to airflow/providers/slack/transfers/sql_to_slack.py
index 2c6138e58e..24a3ed93c6 100644
--- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py
+++ b/airflow/providers/slack/transfers/sql_to_slack.py
@@ -14,49 +14,76 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import warnings
 from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union
 
 from pandas import DataFrame
 from tabulate import tabulate
 
 from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.hooks.dbapi import DbApiHook
 from airflow.models import BaseOperator
 from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
-from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.providers_manager import ProvidersManager
+from airflow.utils.module_loading import import_string
+from airflow.version import version
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
 
 
-class SnowflakeToSlackOperator(BaseOperator):
+def _backported_get_hook(connection, *, hook_params=None):
+    """Return hook based on conn_type
+    For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed
+    when "apache-airflow-providers-slack" will depend on Airflow >= 2.3.
+    """
+    hook = ProvidersManager().hooks.get(connection.conn_type, None)
+
+    if hook is None:
+        raise AirflowException(f'Unknown hook type "{connection.conn_type}"')
+    try:
+        hook_class = import_string(hook.hook_class_name)
+    except ImportError:
+        warnings.warn(
+            "Could not import %s when discovering %s %s",
+            hook.hook_class_name,
+            hook.hook_name,
+            hook.package_name,
+        )
+        raise
+    if hook_params is None:
+        hook_params = {}
+    return hook_class(**{hook.connection_id_attribute_name: connection.conn_id}, **hook_params)
+
+
+class SqlToSlackOperator(BaseOperator):
     """
-    Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are
-    rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
-    results_df }}'. The 'results_df' variable name can be changed by specifying a different
+    Executes an SQL statement in a given SQL connection and sends the results to Slack. The results of the
+    query are rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called
+    '{{ results_df }}'. The 'results_df' variable name can be changed by specifying a different
     'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to
     allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df |
     tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table.
 
     .. seealso::
         For more information on how to use this operator, take a look at the guide:
-        :ref:`howto/operator:SnowflakeToSlackOperator`
+        :ref:`howto/operator:SqlToSlackOperator`
 
     :param sql: The SQL statement to execute on Snowflake (templated)
     :param slack_message: The templated Slack message to send with the data returned from Snowflake.
         You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the
         SQL results
-    :param snowflake_conn_id: Reference to
+    :param sql_conn_id: Reference to
         :ref:`Snowflake connection id<howto/connection:snowflake>`
-    :param slack_conn_id: The connection id for Slack
+    :param sql_hook_params: Extra config params to be passed to the underlying hook.
+           Should match the desired hook constructor params.
+    :param slack_conn_id: The connection id for Slack.
+    :param slack_webhook_token: The token to use to authenticate to Slack. If this is not provided, the
+        'slack_conn_id' attribute needs to be specified in the 'password' field.
+    :param slack_channel: The channel to send message. Override default from Slack connection.
     :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df'
     :param parameters: The parameters to pass to the SQL query
-    :param warehouse: The Snowflake virtual warehouse to use to run the SQL query
-    :param database: The Snowflake database to use for the SQL query
-    :param schema: The schema to run the SQL against in Snowflake
-    :param role: The role to use when connecting to Snowflake
-    :param slack_token: The token to use to authenticate to Slack. If this is not provided, the
-        'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id
     """
 
     template_fields: Sequence[str] = ('sql', 'slack_message')
@@ -68,37 +95,56 @@ class SnowflakeToSlackOperator(BaseOperator):
         self,
         *,
         sql: str,
+        sql_conn_id: str,
+        sql_hook_params: Optional[dict] = None,
+        slack_conn_id: Optional[str] = None,
+        slack_webhook_token: Optional[str] = None,
+        slack_channel: Optional[str] = None,
         slack_message: str,
-        snowflake_conn_id: str = 'snowflake_default',
-        slack_conn_id: str = 'slack_default',
         results_df_name: str = 'results_df',
         parameters: Optional[Union[Iterable, Mapping]] = None,
-        warehouse: Optional[str] = None,
-        database: Optional[str] = None,
-        schema: Optional[str] = None,
-        role: Optional[str] = None,
-        slack_token: Optional[str] = None,
         **kwargs,
     ) -> None:
+
         super().__init__(**kwargs)
 
-        self.snowflake_conn_id = snowflake_conn_id
+        self.sql_conn_id = sql_conn_id
+        self.sql_hook_params = sql_hook_params
         self.sql = sql
         self.parameters = parameters
-        self.warehouse = warehouse
-        self.database = database
-        self.schema = schema
-        self.role = role
         self.slack_conn_id = slack_conn_id
-        self.slack_token = slack_token
+        self.slack_webhook_token = slack_webhook_token
+        self.slack_channel = slack_channel
         self.slack_message = slack_message
         self.results_df_name = results_df_name
+        self.kwargs = kwargs
+
+        if not self.slack_conn_id and not self.slack_webhook_token:
+            raise AirflowException(
+                "SqlToSlackOperator requires either a `slack_conn_id` or a `slack_webhook_token` argument"
+            )
+
+    def _get_hook(self) -> DbApiHook:
+        self.log.debug("Get connection for %s", self.sql_conn_id)
+        conn = BaseHook.get_connection(self.sql_conn_id)
+        if version >= '2.3':
+            # "hook_params" were introduced to into "get_hook()" only in Airflow 2.3.
+            hook = conn.get_hook(hook_params=self.sql_hook_params)  # ignore airflow compat check
+        else:
+            # For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed
+            # when "apache-airflow-providers-slack" will depend on Airflow >= 2.3.
+            hook = _backported_get_hook(conn, hook_params=self.sql_hook_params)
+        if not callable(getattr(hook, 'get_pandas_df', None)):
+            raise AirflowException(
+                "This hook is not supported. The hook class must have get_pandas_df method."
+            )
+        return hook
 
     def _get_query_results(self) -> DataFrame:
-        snowflake_hook = self._get_snowflake_hook()
+        sql_hook = self._get_hook()
 
         self.log.info('Running SQL query: %s', self.sql)
-        df = snowflake_hook.get_pandas_df(self.sql, parameters=self.parameters)
+        df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters)
         return df
 
     def _render_and_send_slack_message(self, context, df) -> None:
@@ -110,18 +156,12 @@ class SnowflakeToSlackOperator(BaseOperator):
         self.log.info('Sending slack message: %s', self.slack_message)
         slack_hook.execute()
 
-    def _get_snowflake_hook(self) -> SnowflakeHook:
-        return SnowflakeHook(
-            snowflake_conn_id=self.snowflake_conn_id,
-            warehouse=self.warehouse,
-            database=self.database,
-            role=self.role,
-            schema=self.schema,
-        )
-
     def _get_slack_hook(self) -> SlackWebhookHook:
         return SlackWebhookHook(
-            http_conn_id=self.slack_conn_id, message=self.slack_message, webhook_token=self.slack_token
+            http_conn_id=self.slack_conn_id,
+            message=self.slack_message,
+            channel=self.slack_channel,
+            webhook_token=self.slack_webhook_token,
         )
 
     def render_template_fields(self, context, jinja_env=None) -> None:
@@ -152,4 +192,4 @@ class SnowflakeToSlackOperator(BaseOperator):
         df = self._get_query_results()
         self._render_and_send_slack_message(context, df)
 
-        self.log.debug('Finished sending Snowflake data to Slack')
+        self.log.debug('Finished sending SQL data to Slack')
diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/snowflake/transfers/snowflake_to_slack.py
index 2c6138e58e..29e0ccbd23 100644
--- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py
+++ b/airflow/providers/snowflake/transfers/snowflake_to_slack.py
@@ -15,21 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union
+import warnings
+from typing import Iterable, Mapping, Optional, Sequence, Union
 
-from pandas import DataFrame
-from tabulate import tabulate
+from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
 
-from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
-from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
-from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
 
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
-
-class SnowflakeToSlackOperator(BaseOperator):
+class SnowflakeToSlackOperator(SqlToSlackOperator):
     """
     Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are
     rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
@@ -48,7 +40,7 @@ class SnowflakeToSlackOperator(BaseOperator):
         SQL results
     :param snowflake_conn_id: Reference to
         :ref:`Snowflake connection id<howto/connection:snowflake>`
-    :param slack_conn_id: The connection id for Slack
+    :param slack_conn_id: The connection id for Slack.
     :param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df'
     :param parameters: The parameters to pass to the SQL query
     :param warehouse: The Snowflake virtual warehouse to use to run the SQL query
@@ -56,7 +48,7 @@ class SnowflakeToSlackOperator(BaseOperator):
     :param schema: The schema to run the SQL against in Snowflake
     :param role: The role to use when connecting to Snowflake
     :param slack_token: The token to use to authenticate to Slack. If this is not provided, the
-        'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id
+        'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id.
     """
 
     template_fields: Sequence[str] = ('sql', 'slack_message')
@@ -80,8 +72,6 @@ class SnowflakeToSlackOperator(BaseOperator):
         slack_token: Optional[str] = None,
         **kwargs,
     ) -> None:
-        super().__init__(**kwargs)
-
         self.snowflake_conn_id = snowflake_conn_id
         self.sql = sql
         self.parameters = parameters
@@ -94,62 +84,31 @@ class SnowflakeToSlackOperator(BaseOperator):
         self.slack_message = slack_message
         self.results_df_name = results_df_name
 
-    def _get_query_results(self) -> DataFrame:
-        snowflake_hook = self._get_snowflake_hook()
-
-        self.log.info('Running SQL query: %s', self.sql)
-        df = snowflake_hook.get_pandas_df(self.sql, parameters=self.parameters)
-        return df
-
-    def _render_and_send_slack_message(self, context, df) -> None:
-        # Put the dataframe into the context and render the JINJA template fields
-        context[self.results_df_name] = df
-        self.render_template_fields(context)
-
-        slack_hook = self._get_slack_hook()
-        self.log.info('Sending slack message: %s', self.slack_message)
-        slack_hook.execute()
-
-    def _get_snowflake_hook(self) -> SnowflakeHook:
-        return SnowflakeHook(
-            snowflake_conn_id=self.snowflake_conn_id,
-            warehouse=self.warehouse,
-            database=self.database,
-            role=self.role,
-            schema=self.schema,
+        warnings.warn(
+            """
+            SnowflakeToSlackOperator is deprecated.
+            Please use `airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator`.
+            """,
+            DeprecationWarning,
+            stacklevel=2,
         )
 
-    def _get_slack_hook(self) -> SlackWebhookHook:
-        return SlackWebhookHook(
-            http_conn_id=self.slack_conn_id, message=self.slack_message, webhook_token=self.slack_token
+        hook_params = {
+            "schema": self.schema,
+            "role": self.role,
+            "database": self.database,
+            "warehouse": self.warehouse,
+        }
+        cleaned_hook_params = {k: v for k, v in hook_params.items() if v is not None}
+
+        super().__init__(
+            sql=self.sql,
+            sql_conn_id=self.snowflake_conn_id,
+            slack_conn_id=self.slack_conn_id,
+            slack_webhook_token=self.slack_token,
+            slack_message=self.slack_message,
+            results_df_name=self.results_df_name,
+            parameters=self.parameters,
+            sql_hook_params=cleaned_hook_params,
+            **kwargs,
         )
-
-    def render_template_fields(self, context, jinja_env=None) -> None:
-        # If this is the first render of the template fields, exclude slack_message from rendering since
-        # the snowflake results haven't been retrieved yet.
-        if self.times_rendered == 0:
-            fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields)
-        else:
-            fields_to_render = self.template_fields
-
-        if not jinja_env:
-            jinja_env = self.get_template_env()
-
-        # Add the tabulate library into the JINJA environment
-        jinja_env.filters['tabulate'] = tabulate
-
-        self._do_render_template_fields(self, fields_to_render, context, jinja_env, set())
-        self.times_rendered += 1
-
-    def execute(self, context: 'Context') -> None:
-        if not isinstance(self.sql, str):
-            raise AirflowException("Expected 'sql' parameter should be a string.")
-        if self.sql is None or self.sql.strip() == "":
-            raise AirflowException("Expected 'sql' parameter is missing.")
-        if self.slack_message is None or self.slack_message.strip() == "":
-            raise AirflowException("Expected 'slack_message' parameter is missing.")
-
-        df = self._get_query_results()
-        self._render_and_send_slack_message(context, df)
-
-        self.log.debug('Finished sending Snowflake data to Slack')
diff --git a/docs/apache-airflow-providers-slack/index.rst b/docs/apache-airflow-providers-slack/index.rst
index 87b299c2f7..eb833733a8 100644
--- a/docs/apache-airflow-providers-slack/index.rst
+++ b/docs/apache-airflow-providers-slack/index.rst
@@ -21,13 +21,23 @@
 
 Content
 -------
+.. toctree::
+    :hidden:
+    :caption: System tests
 
+    System Tests <_api/tests/system/providers/slack/index>
 .. toctree::
     :maxdepth: 1
     :caption: Guides
 
     How-to Guide <operators/slack_operator_howto_guide>
 
+.. toctree::
+    :maxdepth: 1
+    :caption: Guides
+
+    SqlToSlackOperator types <operators/sql_to_slack>
+
 .. toctree::
     :maxdepth: 1
     :caption: References
diff --git a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
new file mode 100644
index 0000000000..18f62f7616
--- /dev/null
+++ b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
@@ -0,0 +1,38 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:SqlToSlackOperator:
+
+SqlToSlackOperator
+========================
+
+Use the :class:`~airflow.providers.slack.transfers.sql_to_slack` to post messages to predefined Slack
+channel.
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+This operator will execute a custom query in the provided SQL connection and publish a Slack message that can be formatted
+and contain the resulting dataset (e.g. ASCII formatted dataframe).
+
+An example usage of the SqlToSlackOperator is as follows:
+
+.. exampleinclude:: /../../tests/system/providers/slack/example_sql_to_slack.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_sql_to_slack]
+    :end-before: [END howto_operator_sql_to_slack]
diff --git a/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py b/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py
index 5c3c6dd89b..d4fe0ea78d 100755
--- a/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py
+++ b/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py
@@ -28,11 +28,11 @@ if __name__ not in ("__main__", "__mp_main__"):
         f"To run this script, run the ./{__file__} command [FILE] ..."
     )
 
-
 console = Console(color_system="standard", width=200)
 
 errors: List[str] = []
 
+SKIP_COMP_CHECK = "# ignore airflow compat check"
 TRY_NUM_MATCHER = re.compile(r".*context.*\[[\"']try_number[\"']].*")
 GET_MANDATORY_MATCHER = re.compile(r".*conf\.get_mandatory_value")
 GET_AIRFLOW_APP_MATCHER = re.compile(r".*get_airflow_app\(\)")
@@ -43,6 +43,9 @@ def _check_file(_file: Path):
     lines = _file.read_text().splitlines()
 
     for index, line in enumerate(lines):
+        if SKIP_COMP_CHECK in line:
+            continue
+
         if "XCom.get_value(" in line:
             if "if ti_key is not None:" not in lines[index - 1]:
                 errors.append(
diff --git a/tests/providers/presto/transfers/test_presto_to_slack.py b/tests/providers/presto/transfers/test_presto_to_slack.py
index 78aa2867ec..bcc5e82a8a 100644
--- a/tests/providers/presto/transfers/test_presto_to_slack.py
+++ b/tests/providers/presto/transfers/test_presto_to_slack.py
@@ -41,9 +41,8 @@ class TestPrestoToSlackOperator:
         operator = PrestoToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
         return operator
 
-    @mock.patch('airflow.providers.presto.transfers.presto_to_slack.PrestoHook')
-    @mock.patch('airflow.providers.presto.transfers.presto_to_slack.SlackWebhookHook')
-    def test_hooks_and_rendering(self, mock_slack_hook_class, mock_presto_hook_class):
+    @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+    def test_hooks_and_rendering_with_slack_conn(self, mock_slack_hook_class):
         operator_args = {
             'presto_conn_id': 'presto_connection',
             'slack_conn_id': 'slack_connection',
@@ -51,27 +50,56 @@ class TestPrestoToSlackOperator:
             'results_df_name': 'xxxx',
             'parameters': ['1', '2', '3'],
             'slack_message': 'message: {{ ds }}, {{ xxxx }}',
-            'slack_token': 'test_token',
             'slack_channel': 'my_channel',
             'dag': self.example_dag,
         }
         presto_to_slack_operator = self._construct_operator(**operator_args)
-        presto_hook = mock_presto_hook_class.return_value
-        presto_hook.get_pandas_df.return_value = '1234'
+        mock_dbapi_hook = mock.Mock()
+        presto_to_slack_operator._get_hook = mock_dbapi_hook
+
+        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+        get_pandas_df_mock.return_value = '1234'
+
         slack_webhook_hook = mock_slack_hook_class.return_value
         presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
-        mock_presto_hook_class.assert_called_once_with(
-            presto_conn_id='presto_connection',
+        mock_slack_hook_class.assert_called_once_with(
+            http_conn_id='slack_connection',
+            message='message: 2022-01-01, 1234',
+            channel='my_channel',
+            webhook_token=None,
         )
 
-        presto_hook.get_pandas_df.assert_called_once_with('sql 2022-01-01', parameters=['1', '2', '3'])
+        slack_webhook_hook.execute.assert_called_once()
+
+    @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+    def test_hooks_and_rendering_with_slack_conn_and_webhook(self, mock_slack_hook_class):
+        operator_args = {
+            'presto_conn_id': 'presto_connection',
+            'slack_conn_id': 'slack_connection',
+            'slack_token': 'test_token',
+            'sql': "sql {{ ds }}",
+            'results_df_name': 'xxxx',
+            'parameters': ['1', '2', '3'],
+            'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+            'slack_channel': 'my_channel',
+            'dag': self.example_dag,
+        }
+        presto_to_slack_operator = self._construct_operator(**operator_args)
+        mock_dbapi_hook = mock.Mock()
+        presto_to_slack_operator._get_hook = mock_dbapi_hook
+
+        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+        get_pandas_df_mock.return_value = '1234'
+
+        slack_webhook_hook = mock_slack_hook_class.return_value
+        presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         mock_slack_hook_class.assert_called_once_with(
             http_conn_id='slack_connection',
             message='message: 2022-01-01, 1234',
+            channel='my_channel',
             webhook_token='test_token',
-            slack_channel='my_channel',
         )
 
         slack_webhook_hook.execute.assert_called_once()
diff --git a/tests/providers/slack/transfers/__init__.py b/tests/providers/slack/transfers/__init__.py
new file mode 100644
index 0000000000..217e5db960
--- /dev/null
+++ b/tests/providers/slack/transfers/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py
new file mode 100644
index 0000000000..0390a56b18
--- /dev/null
+++ b/tests/providers/slack/transfers/test_sql_to_slack.py
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import pandas as pd
+import pytest
+
+from airflow import AirflowException
+from airflow.models import DAG, Connection
+from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
+from airflow.utils import timezone
+
+TEST_DAG_ID = 'sql_to_slack_unit_test'
+TEST_TASK_ID = 'sql_to_slack_unit_test_task'
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+
+class TestSqlToSlackOperator:
+    def setup_method(self):
+        self.example_dag = DAG(TEST_DAG_ID, start_date=DEFAULT_DATE)
+
+    @staticmethod
+    def _construct_operator(**kwargs):
+        operator = SqlToSlackOperator(task_id=TEST_TASK_ID, **kwargs)
+        return operator
+
+    @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+    def test_rendering_and_message_execution(self, mock_slack_hook_class):
+        mock_dbapi_hook = mock.Mock()
+
+        test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1])
+        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+        get_pandas_df_mock.return_value = test_df
+
+        operator_args = {
+            'sql_conn_id': 'snowflake_connection',
+            'slack_conn_id': 'slack_connection',
+            'slack_message': 'message: {{ ds }}, {{ results_df }}',
+            'slack_channel': '#test',
+            'sql': "sql {{ ds }}",
+            'dag': self.example_dag,
+        }
+        sql_to_slack_operator = self._construct_operator(**operator_args)
+
+        slack_webhook_hook = mock_slack_hook_class.return_value
+        sql_to_slack_operator._get_hook = mock_dbapi_hook
+        sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # Test that the Slack hook is instantiated with the right parameters
+        mock_slack_hook_class.assert_called_once_with(
+            http_conn_id='slack_connection',
+            message=f'message: 2017-01-01, {test_df}',
+            channel='#test',
+            webhook_token=None,
+        )
+
+        # Test that the Slack hook's execute method gets run once
+        slack_webhook_hook.execute.assert_called_once()
+
+    @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+    def test_rendering_and_message_execution_with_slack_hook(self, mock_slack_hook_class):
+        mock_dbapi_hook = mock.Mock()
+
+        test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1])
+        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+        get_pandas_df_mock.return_value = test_df
+
+        operator_args = {
+            'sql_conn_id': 'snowflake_connection',
+            'slack_conn_id': 'slack_connection',
+            'slack_webhook_token': 'test_token',
+            'slack_message': 'message: {{ ds }}, {{ results_df }}',
+            'slack_channel': '#test',
+            'sql': "sql {{ ds }}",
+            'dag': self.example_dag,
+        }
+        sql_to_slack_operator = self._construct_operator(**operator_args)
+
+        slack_webhook_hook = mock_slack_hook_class.return_value
+        sql_to_slack_operator._get_hook = mock_dbapi_hook
+        sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # Test that the Slack hook is instantiated with the right parameters
+        mock_slack_hook_class.assert_called_once_with(
+            http_conn_id='slack_connection',
+            message=f'message: 2017-01-01, {test_df}',
+            channel='#test',
+            webhook_token='test_token',
+        )
+
+        # Test that the Slack hook's execute method gets run once
+        slack_webhook_hook.execute.assert_called_once()
+
+    def test_non_existing_slack_parameters_provided_exception_thrown(self):
+        operator_args = {
+            'sql_conn_id': 'snowflake_connection',
+            'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+            'sql': "sql {{ ds }}",
+        }
+        with pytest.raises(AirflowException):
+            self._construct_operator(**operator_args)
+
+    @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+    def test_rendering_custom_df_name_message_execution(self, mock_slack_hook_class):
+        mock_dbapi_hook = mock.Mock()
+
+        test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1])
+        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+        get_pandas_df_mock.return_value = test_df
+
+        operator_args = {
+            'sql_conn_id': 'snowflake_connection',
+            'slack_conn_id': 'slack_connection',
+            'slack_message': 'message: {{ ds }}, {{ testing }}',
+            'slack_channel': '#test',
+            'sql': "sql {{ ds }}",
+            'results_df_name': 'testing',
+            'dag': self.example_dag,
+        }
+        sql_to_slack_operator = self._construct_operator(**operator_args)
+
+        slack_webhook_hook = mock_slack_hook_class.return_value
+        sql_to_slack_operator._get_hook = mock_dbapi_hook
+        sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        # Test that the Slack hook is instantiated with the right parameters
+        mock_slack_hook_class.assert_called_once_with(
+            http_conn_id='slack_connection',
+            message=f'message: 2017-01-01, {test_df}',
+            channel='#test',
+            webhook_token=None,
+        )
+
+        # Test that the Slack hook's execute method gets run once
+        slack_webhook_hook.execute.assert_called_once()
+
+    @mock.patch('airflow.operators.sql.BaseHook.get_connection')
+    def test_hook_params_building(self, mock_get_conn):
+        mock_get_conn.return_value = Connection(conn_id='snowflake_connection', conn_type='snowflake')
+        hook_params = {
+            'schema': 'test_schema',
+            'role': 'test_role',
+            'database': 'test_database',
+            'warehouse': 'test_warehouse',
+        }
+        operator_args = {
+            'sql_conn_id': 'dummy_connection',
+            'sql': "sql {{ ds }}",
+            'results_df_name': 'xxxx',
+            'sql_hook_params': hook_params,
+            'parameters': ['1', '2', '3'],
+            'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+            'slack_webhook_token': 'test_token',
+            'dag': self.example_dag,
+        }
+        sql_to_slack_operator = SqlToSlackOperator(task_id=TEST_TASK_ID, **operator_args)
+
+        assert sql_to_slack_operator.sql_hook_params == hook_params
+
+    @mock.patch('airflow.operators.sql.BaseHook.get_connection')
+    def test_hook_params(self, mock_get_conn):
+        mock_get_conn.return_value = Connection(conn_id='postgres_test', conn_type='postgres')
+        op = SqlToSlackOperator(
+            task_id='sql_hook_params',
+            sql_conn_id='postgres_test',
+            slack_webhook_token='slack_token',
+            sql="SELECT 1",
+            slack_message='message: {{ ds }}, {{ xxxx }}',
+            sql_hook_params={
+                'schema': 'public',
+            },
+        )
+        hook = op._get_hook()
+        assert hook.schema == 'public'
+
+    @mock.patch('airflow.operators.sql.BaseHook.get_connection')
+    def test_hook_params_snowflake(self, mock_get_conn):
+        mock_get_conn.return_value = Connection(conn_id='snowflake_default', conn_type='snowflake')
+        op = SqlToSlackOperator(
+            task_id='snowflake_hook_params',
+            sql_conn_id='snowflake_default',
+            slack_conn_id='slack_default',
+            results_df_name='xxxx',
+            sql="SELECT 1",
+            slack_message='message: {{ ds }}, {{ xxxx }}',
+            sql_hook_params={
+                'warehouse': 'warehouse',
+                'database': 'database',
+                'role': 'role',
+                'schema': 'schema',
+            },
+        )
+        hook = op._get_hook()
+
+        assert hook.warehouse == 'warehouse'
+        assert hook.database == 'database'
+        assert hook.role == 'role'
+        assert hook.schema == 'schema'
diff --git a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py
index f4a28bd7cc..8973d1701f 100644
--- a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py
+++ b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py
@@ -20,6 +20,7 @@ from unittest import mock
 from airflow.models import DAG
 from airflow.providers.snowflake.transfers.snowflake_to_slack import SnowflakeToSlackOperator
 from airflow.utils import timezone
+from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_runs
 
 TEST_DAG_ID = 'snowflake_to_slack_unit_test'
@@ -38,15 +39,14 @@ class TestSnowflakeToSlackOperator:
 
     @staticmethod
     def _construct_operator(**kwargs):
-        operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
-        return operator
+        with conf_vars({('operators', 'allow_illegal_arguments'): 'True'}):
+            operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
+            return operator
 
-    @mock.patch('airflow.providers.snowflake.transfers.snowflake_to_slack.SnowflakeHook')
-    @mock.patch('airflow.providers.snowflake.transfers.snowflake_to_slack.SlackWebhookHook')
-    def test_hooks_and_rendering(self, mock_slack_hook_class, mock_snowflake_hook_class):
+    @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+    def test_hooks_and_rendering(self, mock_slack_hook_class):
         operator_args = {
             'snowflake_conn_id': 'snowflake_connection',
-            'slack_conn_id': 'slack_connection',
             'sql': "sql {{ ds }}",
             'results_df_name': 'xxxx',
             'warehouse': 'test_warehouse',
@@ -60,29 +60,75 @@ class TestSnowflakeToSlackOperator:
         }
         snowflake_to_slack_operator = self._construct_operator(**operator_args)
 
-        snowflake_hook = mock_snowflake_hook_class.return_value
-        snowflake_hook.get_pandas_df.return_value = '1234'
-        slack_webhook_hook = mock_slack_hook_class.return_value
+        mock_dbapi_hook = mock.Mock()
+        snowflake_to_slack_operator._get_hook = mock_dbapi_hook
 
-        snowflake_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-        # Test that the Snowflake hook is instantiated with the right parameters
-        mock_snowflake_hook_class.assert_called_once_with(
-            database='test_database',
-            role='test_role',
-            schema='test_schema',
-            snowflake_conn_id='snowflake_connection',
-            warehouse='test_warehouse',
-        )
+        get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+        get_pandas_df_mock.return_value = '1234'
 
-        # Test that the get_pandas_df method is executed on the Snowflake hook with the pre-rendered sql and
-        # correct params
-        snowflake_hook.get_pandas_df.assert_called_once_with('sql 2017-01-01', parameters=['1', '2', '3'])
+        snowflake_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         # Test that the Slack hook is instantiated with the right parameters
         mock_slack_hook_class.assert_called_once_with(
-            http_conn_id='slack_connection', message='message: 2017-01-01, 1234', webhook_token='test_token'
+            message='message: 2017-01-01, 1234',
+            webhook_token='test_token',
+            channel=None,
+            http_conn_id='slack_default',
         )
 
-        # Test that the Slack hook's execute method gets run once
-        slack_webhook_hook.execute.assert_called_once()
+    def test_hook_params_building(self):
+        hook_params = {
+            'schema': 'test_schema',
+            'role': 'test_role',
+            'database': 'test_database',
+            'warehouse': 'test_warehouse',
+        }
+        operator_args = {
+            'snowflake_conn_id': 'snowflake_connection',
+            'sql': "sql {{ ds }}",
+            'results_df_name': 'xxxx',
+            'warehouse': hook_params['warehouse'],
+            'database': hook_params['database'],
+            'role': hook_params['role'],
+            'schema': hook_params['schema'],
+            'parameters': ['1', '2', '3'],
+            'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+            'slack_token': 'test_token',
+            'dag': self.example_dag,
+        }
+        snowflake_operator = self._construct_operator(**operator_args)
+
+        assert snowflake_operator.sql_hook_params == hook_params
+
+    def test_partial_hook_params_building(self):
+        hook_params = {'role': 'test_role', 'database': 'test_database', 'warehouse': 'test_warehouse'}
+        operator_args = {
+            'snowflake_conn_id': 'snowflake_connection',
+            'sql': "sql {{ ds }}",
+            'results_df_name': 'xxxx',
+            'warehouse': hook_params['warehouse'],
+            'database': hook_params['database'],
+            'role': hook_params['role'],
+            'schema': None,
+            'parameters': ['1', '2', '3'],
+            'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+            'slack_token': 'test_token',
+            'dag': self.example_dag,
+        }
+        snowflake_operator = self._construct_operator(**operator_args)
+
+        assert snowflake_operator.sql_hook_params == hook_params
+
+    def test_no_hook_params_building(self):
+        operator_args = {
+            'snowflake_conn_id': 'snowflake_connection',
+            'sql': "sql {{ ds }}",
+            'results_df_name': 'xxxx',
+            'parameters': ['1', '2', '3'],
+            'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+            'slack_token': 'test_token',
+            'dag': self.example_dag,
+        }
+        snowflake_operator = self._construct_operator(**operator_args)
+
+        assert snowflake_operator.sql_hook_params == {}
diff --git a/tests/system/providers/presto/example_presto_to_slack.py b/tests/system/providers/presto/example_presto_to_slack.py
index 91ab9c42e6..dc87c831b4 100644
--- a/tests/system/providers/presto/example_presto_to_slack.py
+++ b/tests/system/providers/presto/example_presto_to_slack.py
@@ -28,6 +28,7 @@ from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOper
 PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table")
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 DAG_ID = "example_presto_to_slack"
+SLACK_CONN_WEBHOOK = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
 
 with models.DAG(
     dag_id=DAG_ID,
@@ -39,6 +40,7 @@ with models.DAG(
     # [START howto_operator_presto_to_slack]
     PrestoToSlackOperator(
         task_id="presto_to_slack",
+        slack_token=SLACK_CONN_WEBHOOK,
         sql=f"SELECT col FROM {PRESTO_TABLE}",
         slack_channel="my_channel",
         slack_message="message: {{ ds }}, {{ results_df }}",
diff --git a/tests/system/providers/slack/__init__.py b/tests/system/providers/slack/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/slack/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/presto/example_presto_to_slack.py b/tests/system/providers/slack/example_sql_to_slack.py
similarity index 75%
copy from tests/system/providers/presto/example_presto_to_slack.py
copy to tests/system/providers/slack/example_sql_to_slack.py
index 91ab9c42e6..afdf3bcf62 100644
--- a/tests/system/providers/presto/example_presto_to_slack.py
+++ b/tests/system/providers/slack/example_sql_to_slack.py
@@ -16,18 +16,19 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-Example DAG using PrestoToSlackOperator.
+Example DAG using SqlToSlackOperator.
 """
 
 import os
 from datetime import datetime
 
 from airflow import models
-from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOperator
+from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
 
-PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table")
+SQL_TABLE = os.environ.get("SQL_TABLE", "test_table")
+SQL_CONN_ID = 'presto_default'
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_presto_to_slack"
+DAG_ID = "example_sql_to_slack"
 
 with models.DAG(
     dag_id=DAG_ID,
@@ -36,14 +37,16 @@ with models.DAG(
     catchup=False,
     tags=["example"],
 ) as dag:
-    # [START howto_operator_presto_to_slack]
-    PrestoToSlackOperator(
+    # [START howto_operator_sql_to_slack]
+    SqlToSlackOperator(
         task_id="presto_to_slack",
-        sql=f"SELECT col FROM {PRESTO_TABLE}",
+        sql_conn_id=SQL_CONN_ID,
+        sql=f"SELECT col FROM {SQL_TABLE}",
         slack_channel="my_channel",
+        slack_conn_id='slack_default',
         slack_message="message: {{ ds }}, {{ results_df }}",
     )
-    # [END howto_operator_presto_to_slack]
+    # [END howto_operator_sql_to_slack]
 
 
 from tests.system.utils import get_test_run  # noqa: E402