You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/06/29 12:28:22 UTC
[airflow] branch main updated: Adding generic `SqlToSlackOperator` (#24663)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 13908c2c91 Adding generic `SqlToSlackOperator` (#24663)
13908c2c91 is described below
commit 13908c2c914cf08f9d962a4d3b6ae54fbdf1d223
Author: Alex Kruchkov <36...@users.noreply.github.com>
AuthorDate: Wed Jun 29 15:28:07 2022 +0300
Adding generic `SqlToSlackOperator` (#24663)
* adding `SqlToSlackOperator`
Co-authored-by: eladkal <45...@users.noreply.github.com>
---
.../providers/presto/transfers/presto_to_slack.py | 87 ++-------
airflow/providers/slack/provider.yaml | 6 +
airflow/providers/slack/transfers/__init__.py | 16 ++
.../transfers/sql_to_slack.py} | 122 ++++++++----
.../snowflake/transfers/snowflake_to_slack.py | 103 +++-------
docs/apache-airflow-providers-slack/index.rst | 10 +
.../operators/sql_to_slack.rst | 38 ++++
.../pre_commit_check_2_2_compatibility.py | 5 +-
.../presto/transfers/test_presto_to_slack.py | 48 ++++-
tests/providers/slack/transfers/__init__.py | 17 ++
.../providers/slack/transfers/test_sql_to_slack.py | 213 +++++++++++++++++++++
.../snowflake/transfers/test_snowflake_to_slack.py | 96 +++++++---
.../providers/presto/example_presto_to_slack.py | 2 +
tests/system/providers/slack/__init__.py | 16 ++
.../example_sql_to_slack.py} | 19 +-
15 files changed, 575 insertions(+), 223 deletions(-)
diff --git a/airflow/providers/presto/transfers/presto_to_slack.py b/airflow/providers/presto/transfers/presto_to_slack.py
index 6dd0ecb3ab..bcf2c3177a 100644
--- a/airflow/providers/presto/transfers/presto_to_slack.py
+++ b/airflow/providers/presto/transfers/presto_to_slack.py
@@ -15,21 +15,13 @@
# specific language governing permissions and limitations
# under the License.
-from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union
+import warnings
+from typing import Iterable, Mapping, Optional, Sequence, Union
-from pandas import DataFrame
-from tabulate import tabulate
+from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
-from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
-from airflow.providers.presto.hooks.presto import PrestoHook
-from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
-if TYPE_CHECKING:
- from airflow.utils.context import Context
-
-
-class PrestoToSlackOperator(BaseOperator):
+class PrestoToSlackOperator(SqlToSlackOperator):
"""
Executes a single SQL statement in Presto and sends the results to Slack. The results of the query are
rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
@@ -73,8 +65,6 @@ class PrestoToSlackOperator(BaseOperator):
slack_channel: Optional[str] = None,
**kwargs,
) -> None:
- super().__init__(**kwargs)
-
self.presto_conn_id = presto_conn_id
self.sql = sql
self.parameters = parameters
@@ -84,58 +74,23 @@ class PrestoToSlackOperator(BaseOperator):
self.results_df_name = results_df_name
self.slack_channel = slack_channel
- def _get_query_results(self) -> DataFrame:
- presto_hook = self._get_presto_hook()
-
- self.log.info('Running SQL query: %s', self.sql)
- df = presto_hook.get_pandas_df(self.sql, parameters=self.parameters)
- return df
-
- def _render_and_send_slack_message(self, context, df) -> None:
- # Put the dataframe into the context and render the JINJA template fields
- context[self.results_df_name] = df
- self.render_template_fields(context)
-
- slack_hook = self._get_slack_hook()
- self.log.info('Sending slack message: %s', self.slack_message)
- slack_hook.execute()
-
- def _get_presto_hook(self) -> PrestoHook:
- return PrestoHook(presto_conn_id=self.presto_conn_id)
+ warnings.warn(
+ """
+ PrestoToSlackOperator is deprecated.
+ Please use `airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator`.
+ """,
+ DeprecationWarning,
+ stacklevel=2,
+ )
- def _get_slack_hook(self) -> SlackWebhookHook:
- return SlackWebhookHook(
- http_conn_id=self.slack_conn_id,
- message=self.slack_message,
- webhook_token=self.slack_token,
+ super().__init__(
+ sql=self.sql,
+ sql_conn_id=self.presto_conn_id,
+ slack_conn_id=self.slack_conn_id,
+ slack_webhook_token=self.slack_token,
+ slack_message=self.slack_message,
slack_channel=self.slack_channel,
+ results_df_name=self.results_df_name,
+ parameters=self.parameters,
+ **kwargs,
)
-
- def render_template_fields(self, context, jinja_env=None) -> None:
- # If this is the first render of the template fields, exclude slack_message from rendering since
- # the presto results haven't been retrieved yet.
- if self.times_rendered == 0:
- fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields)
- else:
- fields_to_render = self.template_fields
-
- if not jinja_env:
- jinja_env = self.get_template_env()
-
- # Add the tabulate library into the JINJA environment
- jinja_env.filters['tabulate'] = tabulate
-
- self._do_render_template_fields(self, fields_to_render, context, jinja_env, set())
- self.times_rendered += 1
-
- def execute(self, context: 'Context') -> None:
- if not self.sql.strip():
- raise AirflowException("Expected 'sql' parameter is missing.")
- if not self.slack_message.strip():
- raise AirflowException("Expected 'slack_message' parameter is missing.")
-
- df = self._get_query_results()
-
- self._render_and_send_slack_message(context, df)
-
- self.log.debug('Finished sending Presto data to Slack')
diff --git a/airflow/providers/slack/provider.yaml b/airflow/providers/slack/provider.yaml
index f54d70710c..cfd1ef89b7 100644
--- a/airflow/providers/slack/provider.yaml
+++ b/airflow/providers/slack/provider.yaml
@@ -57,6 +57,12 @@ hooks:
- airflow.providers.slack.hooks.slack
- airflow.providers.slack.hooks.slack_webhook
+transfers:
+ - source-integration-name: SQL
+ target-integration-name: Slack
+ python-module: airflow.providers.slack.transfers.sql_to_slack
+ how-to-guide: /docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
+
connection-types:
- hook-class-name: airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook
connection-type: slackwebhook
diff --git a/airflow/providers/slack/transfers/__init__.py b/airflow/providers/slack/transfers/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/slack/transfers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/slack/transfers/sql_to_slack.py
similarity index 54%
copy from airflow/providers/snowflake/transfers/snowflake_to_slack.py
copy to airflow/providers/slack/transfers/sql_to_slack.py
index 2c6138e58e..24a3ed93c6 100644
--- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py
+++ b/airflow/providers/slack/transfers/sql_to_slack.py
@@ -14,49 +14,76 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import warnings
from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union
from pandas import DataFrame
from tabulate import tabulate
from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.hooks.dbapi import DbApiHook
from airflow.models import BaseOperator
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
-from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
+from airflow.providers_manager import ProvidersManager
+from airflow.utils.module_loading import import_string
+from airflow.version import version
if TYPE_CHECKING:
from airflow.utils.context import Context
-class SnowflakeToSlackOperator(BaseOperator):
+def _backported_get_hook(connection, *, hook_params=None):
+ """Return hook based on conn_type
+ For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed
+ when "apache-airflow-providers-slack" will depend on Airflow >= 2.3.
+ """
+ hook = ProvidersManager().hooks.get(connection.conn_type, None)
+
+ if hook is None:
+ raise AirflowException(f'Unknown hook type "{connection.conn_type}"')
+ try:
+ hook_class = import_string(hook.hook_class_name)
+ except ImportError:
+ warnings.warn(
+ "Could not import %s when discovering %s %s",
+ hook.hook_class_name,
+ hook.hook_name,
+ hook.package_name,
+ )
+ raise
+ if hook_params is None:
+ hook_params = {}
+ return hook_class(**{hook.connection_id_attribute_name: connection.conn_id}, **hook_params)
+
+
+class SqlToSlackOperator(BaseOperator):
"""
- Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are
- rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
- results_df }}'. The 'results_df' variable name can be changed by specifying a different
+ Executes an SQL statement in a given SQL connection and sends the results to Slack. The results of the
+ query are rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called
+ '{{ results_df }}'. The 'results_df' variable name can be changed by specifying a different
'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to
allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df |
tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table.
.. seealso::
For more information on how to use this operator, take a look at the guide:
- :ref:`howto/operator:SnowflakeToSlackOperator`
+ :ref:`howto/operator:SqlToSlackOperator`
:param sql: The SQL statement to execute on Snowflake (templated)
:param slack_message: The templated Slack message to send with the data returned from Snowflake.
You can use the default JINJA variable {{ results_df }} to access the pandas dataframe containing the
SQL results
- :param snowflake_conn_id: Reference to
+ :param sql_conn_id: Reference to
:ref:`Snowflake connection id<howto/connection:snowflake>`
- :param slack_conn_id: The connection id for Slack
+ :param sql_hook_params: Extra config params to be passed to the underlying hook.
+ Should match the desired hook constructor params.
+ :param slack_conn_id: The connection id for Slack.
+ :param slack_webhook_token: The token to use to authenticate to Slack. If this is not provided, the
+ 'slack_conn_id' attribute needs to be specified in the 'password' field.
+ :param slack_channel: The channel to send message. Override default from Slack connection.
:param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df'
:param parameters: The parameters to pass to the SQL query
- :param warehouse: The Snowflake virtual warehouse to use to run the SQL query
- :param database: The Snowflake database to use for the SQL query
- :param schema: The schema to run the SQL against in Snowflake
- :param role: The role to use when connecting to Snowflake
- :param slack_token: The token to use to authenticate to Slack. If this is not provided, the
- 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id
"""
template_fields: Sequence[str] = ('sql', 'slack_message')
@@ -68,37 +95,56 @@ class SnowflakeToSlackOperator(BaseOperator):
self,
*,
sql: str,
+ sql_conn_id: str,
+ sql_hook_params: Optional[dict] = None,
+ slack_conn_id: Optional[str] = None,
+ slack_webhook_token: Optional[str] = None,
+ slack_channel: Optional[str] = None,
slack_message: str,
- snowflake_conn_id: str = 'snowflake_default',
- slack_conn_id: str = 'slack_default',
results_df_name: str = 'results_df',
parameters: Optional[Union[Iterable, Mapping]] = None,
- warehouse: Optional[str] = None,
- database: Optional[str] = None,
- schema: Optional[str] = None,
- role: Optional[str] = None,
- slack_token: Optional[str] = None,
**kwargs,
) -> None:
+
super().__init__(**kwargs)
- self.snowflake_conn_id = snowflake_conn_id
+ self.sql_conn_id = sql_conn_id
+ self.sql_hook_params = sql_hook_params
self.sql = sql
self.parameters = parameters
- self.warehouse = warehouse
- self.database = database
- self.schema = schema
- self.role = role
self.slack_conn_id = slack_conn_id
- self.slack_token = slack_token
+ self.slack_webhook_token = slack_webhook_token
+ self.slack_channel = slack_channel
self.slack_message = slack_message
self.results_df_name = results_df_name
+ self.kwargs = kwargs
+
+ if not self.slack_conn_id and not self.slack_webhook_token:
+ raise AirflowException(
+ "SqlToSlackOperator requires either a `slack_conn_id` or a `slack_webhook_token` argument"
+ )
+
+ def _get_hook(self) -> DbApiHook:
+ self.log.debug("Get connection for %s", self.sql_conn_id)
+ conn = BaseHook.get_connection(self.sql_conn_id)
+ if version >= '2.3':
+ # "hook_params" were introduced to into "get_hook()" only in Airflow 2.3.
+ hook = conn.get_hook(hook_params=self.sql_hook_params) # ignore airflow compat check
+ else:
+ # For supporting Airflow versions < 2.3, we backport "get_hook()" method. This should be removed
+ # when "apache-airflow-providers-slack" will depend on Airflow >= 2.3.
+ hook = _backported_get_hook(conn, hook_params=self.sql_hook_params)
+ if not callable(getattr(hook, 'get_pandas_df', None)):
+ raise AirflowException(
+ "This hook is not supported. The hook class must have get_pandas_df method."
+ )
+ return hook
def _get_query_results(self) -> DataFrame:
- snowflake_hook = self._get_snowflake_hook()
+ sql_hook = self._get_hook()
self.log.info('Running SQL query: %s', self.sql)
- df = snowflake_hook.get_pandas_df(self.sql, parameters=self.parameters)
+ df = sql_hook.get_pandas_df(self.sql, parameters=self.parameters)
return df
def _render_and_send_slack_message(self, context, df) -> None:
@@ -110,18 +156,12 @@ class SnowflakeToSlackOperator(BaseOperator):
self.log.info('Sending slack message: %s', self.slack_message)
slack_hook.execute()
- def _get_snowflake_hook(self) -> SnowflakeHook:
- return SnowflakeHook(
- snowflake_conn_id=self.snowflake_conn_id,
- warehouse=self.warehouse,
- database=self.database,
- role=self.role,
- schema=self.schema,
- )
-
def _get_slack_hook(self) -> SlackWebhookHook:
return SlackWebhookHook(
- http_conn_id=self.slack_conn_id, message=self.slack_message, webhook_token=self.slack_token
+ http_conn_id=self.slack_conn_id,
+ message=self.slack_message,
+ channel=self.slack_channel,
+ webhook_token=self.slack_webhook_token,
)
def render_template_fields(self, context, jinja_env=None) -> None:
@@ -152,4 +192,4 @@ class SnowflakeToSlackOperator(BaseOperator):
df = self._get_query_results()
self._render_and_send_slack_message(context, df)
- self.log.debug('Finished sending Snowflake data to Slack')
+ self.log.debug('Finished sending SQL data to Slack')
diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/snowflake/transfers/snowflake_to_slack.py
index 2c6138e58e..29e0ccbd23 100644
--- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py
+++ b/airflow/providers/snowflake/transfers/snowflake_to_slack.py
@@ -15,21 +15,13 @@
# specific language governing permissions and limitations
# under the License.
-from typing import TYPE_CHECKING, Iterable, Mapping, Optional, Sequence, Union
+import warnings
+from typing import Iterable, Mapping, Optional, Sequence, Union
-from pandas import DataFrame
-from tabulate import tabulate
+from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
-from airflow.exceptions import AirflowException
-from airflow.models import BaseOperator
-from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
-from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
-if TYPE_CHECKING:
- from airflow.utils.context import Context
-
-
-class SnowflakeToSlackOperator(BaseOperator):
+class SnowflakeToSlackOperator(SqlToSlackOperator):
"""
Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are
rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
@@ -48,7 +40,7 @@ class SnowflakeToSlackOperator(BaseOperator):
SQL results
:param snowflake_conn_id: Reference to
:ref:`Snowflake connection id<howto/connection:snowflake>`
- :param slack_conn_id: The connection id for Slack
+ :param slack_conn_id: The connection id for Slack.
:param results_df_name: The name of the JINJA template's dataframe variable, default is 'results_df'
:param parameters: The parameters to pass to the SQL query
:param warehouse: The Snowflake virtual warehouse to use to run the SQL query
@@ -56,7 +48,7 @@ class SnowflakeToSlackOperator(BaseOperator):
:param schema: The schema to run the SQL against in Snowflake
:param role: The role to use when connecting to Snowflake
:param slack_token: The token to use to authenticate to Slack. If this is not provided, the
- 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id
+ 'webhook_token' attribute needs to be specified in the 'Extra' JSON field against the slack_conn_id.
"""
template_fields: Sequence[str] = ('sql', 'slack_message')
@@ -80,8 +72,6 @@ class SnowflakeToSlackOperator(BaseOperator):
slack_token: Optional[str] = None,
**kwargs,
) -> None:
- super().__init__(**kwargs)
-
self.snowflake_conn_id = snowflake_conn_id
self.sql = sql
self.parameters = parameters
@@ -94,62 +84,31 @@ class SnowflakeToSlackOperator(BaseOperator):
self.slack_message = slack_message
self.results_df_name = results_df_name
- def _get_query_results(self) -> DataFrame:
- snowflake_hook = self._get_snowflake_hook()
-
- self.log.info('Running SQL query: %s', self.sql)
- df = snowflake_hook.get_pandas_df(self.sql, parameters=self.parameters)
- return df
-
- def _render_and_send_slack_message(self, context, df) -> None:
- # Put the dataframe into the context and render the JINJA template fields
- context[self.results_df_name] = df
- self.render_template_fields(context)
-
- slack_hook = self._get_slack_hook()
- self.log.info('Sending slack message: %s', self.slack_message)
- slack_hook.execute()
-
- def _get_snowflake_hook(self) -> SnowflakeHook:
- return SnowflakeHook(
- snowflake_conn_id=self.snowflake_conn_id,
- warehouse=self.warehouse,
- database=self.database,
- role=self.role,
- schema=self.schema,
+ warnings.warn(
+ """
+ SnowflakeToSlackOperator is deprecated.
+ Please use `airflow.providers.slack.transfers.sql_to_slack.SqlToSlackOperator`.
+ """,
+ DeprecationWarning,
+ stacklevel=2,
)
- def _get_slack_hook(self) -> SlackWebhookHook:
- return SlackWebhookHook(
- http_conn_id=self.slack_conn_id, message=self.slack_message, webhook_token=self.slack_token
+ hook_params = {
+ "schema": self.schema,
+ "role": self.role,
+ "database": self.database,
+ "warehouse": self.warehouse,
+ }
+ cleaned_hook_params = {k: v for k, v in hook_params.items() if v is not None}
+
+ super().__init__(
+ sql=self.sql,
+ sql_conn_id=self.snowflake_conn_id,
+ slack_conn_id=self.slack_conn_id,
+ slack_webhook_token=self.slack_token,
+ slack_message=self.slack_message,
+ results_df_name=self.results_df_name,
+ parameters=self.parameters,
+ sql_hook_params=cleaned_hook_params,
+ **kwargs,
)
-
- def render_template_fields(self, context, jinja_env=None) -> None:
- # If this is the first render of the template fields, exclude slack_message from rendering since
- # the snowflake results haven't been retrieved yet.
- if self.times_rendered == 0:
- fields_to_render: Iterable[str] = filter(lambda x: x != 'slack_message', self.template_fields)
- else:
- fields_to_render = self.template_fields
-
- if not jinja_env:
- jinja_env = self.get_template_env()
-
- # Add the tabulate library into the JINJA environment
- jinja_env.filters['tabulate'] = tabulate
-
- self._do_render_template_fields(self, fields_to_render, context, jinja_env, set())
- self.times_rendered += 1
-
- def execute(self, context: 'Context') -> None:
- if not isinstance(self.sql, str):
- raise AirflowException("Expected 'sql' parameter should be a string.")
- if self.sql is None or self.sql.strip() == "":
- raise AirflowException("Expected 'sql' parameter is missing.")
- if self.slack_message is None or self.slack_message.strip() == "":
- raise AirflowException("Expected 'slack_message' parameter is missing.")
-
- df = self._get_query_results()
- self._render_and_send_slack_message(context, df)
-
- self.log.debug('Finished sending Snowflake data to Slack')
diff --git a/docs/apache-airflow-providers-slack/index.rst b/docs/apache-airflow-providers-slack/index.rst
index 87b299c2f7..eb833733a8 100644
--- a/docs/apache-airflow-providers-slack/index.rst
+++ b/docs/apache-airflow-providers-slack/index.rst
@@ -21,13 +21,23 @@
Content
-------
+.. toctree::
+ :hidden:
+ :caption: System tests
+ System Tests <_api/tests/system/providers/slack/index>
.. toctree::
:maxdepth: 1
:caption: Guides
How-to Guide <operators/slack_operator_howto_guide>
+.. toctree::
+ :maxdepth: 1
+ :caption: Guides
+
+ SqlToSlackOperator types <operators/sql_to_slack>
+
.. toctree::
:maxdepth: 1
:caption: References
diff --git a/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
new file mode 100644
index 0000000000..18f62f7616
--- /dev/null
+++ b/docs/apache-airflow-providers-slack/operators/sql_to_slack.rst
@@ -0,0 +1,38 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _howto/operator:SqlToSlackOperator:
+
+SqlToSlackOperator
+========================
+
+Use the :class:`~airflow.providers.slack.transfers.sql_to_slack` to post messages to predefined Slack
+channel.
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+This operator will execute a custom query in the provided SQL connection and publish a Slack message that can be formatted
+and contain the resulting dataset (e.g. ASCII formatted dataframe).
+
+An example usage of the SqlToSlackOperator is as follows:
+
+.. exampleinclude:: /../../tests/system/providers/slack/example_sql_to_slack.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_sql_to_slack]
+ :end-before: [END howto_operator_sql_to_slack]
diff --git a/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py b/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py
index 5c3c6dd89b..d4fe0ea78d 100755
--- a/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py
+++ b/scripts/ci/pre_commit/pre_commit_check_2_2_compatibility.py
@@ -28,11 +28,11 @@ if __name__ not in ("__main__", "__mp_main__"):
f"To run this script, run the ./{__file__} command [FILE] ..."
)
-
console = Console(color_system="standard", width=200)
errors: List[str] = []
+SKIP_COMP_CHECK = "# ignore airflow compat check"
TRY_NUM_MATCHER = re.compile(r".*context.*\[[\"']try_number[\"']].*")
GET_MANDATORY_MATCHER = re.compile(r".*conf\.get_mandatory_value")
GET_AIRFLOW_APP_MATCHER = re.compile(r".*get_airflow_app\(\)")
@@ -43,6 +43,9 @@ def _check_file(_file: Path):
lines = _file.read_text().splitlines()
for index, line in enumerate(lines):
+ if SKIP_COMP_CHECK in line:
+ continue
+
if "XCom.get_value(" in line:
if "if ti_key is not None:" not in lines[index - 1]:
errors.append(
diff --git a/tests/providers/presto/transfers/test_presto_to_slack.py b/tests/providers/presto/transfers/test_presto_to_slack.py
index 78aa2867ec..bcc5e82a8a 100644
--- a/tests/providers/presto/transfers/test_presto_to_slack.py
+++ b/tests/providers/presto/transfers/test_presto_to_slack.py
@@ -41,9 +41,8 @@ class TestPrestoToSlackOperator:
operator = PrestoToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
return operator
- @mock.patch('airflow.providers.presto.transfers.presto_to_slack.PrestoHook')
- @mock.patch('airflow.providers.presto.transfers.presto_to_slack.SlackWebhookHook')
- def test_hooks_and_rendering(self, mock_slack_hook_class, mock_presto_hook_class):
+ @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+ def test_hooks_and_rendering_with_slack_conn(self, mock_slack_hook_class):
operator_args = {
'presto_conn_id': 'presto_connection',
'slack_conn_id': 'slack_connection',
@@ -51,27 +50,56 @@ class TestPrestoToSlackOperator:
'results_df_name': 'xxxx',
'parameters': ['1', '2', '3'],
'slack_message': 'message: {{ ds }}, {{ xxxx }}',
- 'slack_token': 'test_token',
'slack_channel': 'my_channel',
'dag': self.example_dag,
}
presto_to_slack_operator = self._construct_operator(**operator_args)
- presto_hook = mock_presto_hook_class.return_value
- presto_hook.get_pandas_df.return_value = '1234'
+ mock_dbapi_hook = mock.Mock()
+ presto_to_slack_operator._get_hook = mock_dbapi_hook
+
+ get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+ get_pandas_df_mock.return_value = '1234'
+
slack_webhook_hook = mock_slack_hook_class.return_value
presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
- mock_presto_hook_class.assert_called_once_with(
- presto_conn_id='presto_connection',
+ mock_slack_hook_class.assert_called_once_with(
+ http_conn_id='slack_connection',
+ message='message: 2022-01-01, 1234',
+ channel='my_channel',
+ webhook_token=None,
)
- presto_hook.get_pandas_df.assert_called_once_with('sql 2022-01-01', parameters=['1', '2', '3'])
+ slack_webhook_hook.execute.assert_called_once()
+
+ @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+ def test_hooks_and_rendering_with_slack_conn_and_webhook(self, mock_slack_hook_class):
+ operator_args = {
+ 'presto_conn_id': 'presto_connection',
+ 'slack_conn_id': 'slack_connection',
+ 'slack_token': 'test_token',
+ 'sql': "sql {{ ds }}",
+ 'results_df_name': 'xxxx',
+ 'parameters': ['1', '2', '3'],
+ 'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+ 'slack_channel': 'my_channel',
+ 'dag': self.example_dag,
+ }
+ presto_to_slack_operator = self._construct_operator(**operator_args)
+ mock_dbapi_hook = mock.Mock()
+ presto_to_slack_operator._get_hook = mock_dbapi_hook
+
+ get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+ get_pandas_df_mock.return_value = '1234'
+
+ slack_webhook_hook = mock_slack_hook_class.return_value
+ presto_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
mock_slack_hook_class.assert_called_once_with(
http_conn_id='slack_connection',
message='message: 2022-01-01, 1234',
+ channel='my_channel',
webhook_token='test_token',
- slack_channel='my_channel',
)
slack_webhook_hook.execute.assert_called_once()
diff --git a/tests/providers/slack/transfers/__init__.py b/tests/providers/slack/transfers/__init__.py
new file mode 100644
index 0000000000..217e5db960
--- /dev/null
+++ b/tests/providers/slack/transfers/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py b/tests/providers/slack/transfers/test_sql_to_slack.py
new file mode 100644
index 0000000000..0390a56b18
--- /dev/null
+++ b/tests/providers/slack/transfers/test_sql_to_slack.py
@@ -0,0 +1,213 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import mock
+
+import pandas as pd
+import pytest
+
+from airflow import AirflowException
+from airflow.models import DAG, Connection
+from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
+from airflow.utils import timezone
+
+TEST_DAG_ID = 'sql_to_slack_unit_test'
+TEST_TASK_ID = 'sql_to_slack_unit_test_task'
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+
+class TestSqlToSlackOperator:
+ def setup_method(self):
+ self.example_dag = DAG(TEST_DAG_ID, start_date=DEFAULT_DATE)
+
+ @staticmethod
+ def _construct_operator(**kwargs):
+ operator = SqlToSlackOperator(task_id=TEST_TASK_ID, **kwargs)
+ return operator
+
+ @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+ def test_rendering_and_message_execution(self, mock_slack_hook_class):
+ mock_dbapi_hook = mock.Mock()
+
+ test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1])
+ get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+ get_pandas_df_mock.return_value = test_df
+
+ operator_args = {
+ 'sql_conn_id': 'snowflake_connection',
+ 'slack_conn_id': 'slack_connection',
+ 'slack_message': 'message: {{ ds }}, {{ results_df }}',
+ 'slack_channel': '#test',
+ 'sql': "sql {{ ds }}",
+ 'dag': self.example_dag,
+ }
+ sql_to_slack_operator = self._construct_operator(**operator_args)
+
+ slack_webhook_hook = mock_slack_hook_class.return_value
+ sql_to_slack_operator._get_hook = mock_dbapi_hook
+ sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ # Test that the Slack hook is instantiated with the right parameters
+ mock_slack_hook_class.assert_called_once_with(
+ http_conn_id='slack_connection',
+ message=f'message: 2017-01-01, {test_df}',
+ channel='#test',
+ webhook_token=None,
+ )
+
+ # Test that the Slack hook's execute method gets run once
+ slack_webhook_hook.execute.assert_called_once()
+
+ @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+ def test_rendering_and_message_execution_with_slack_hook(self, mock_slack_hook_class):
+ mock_dbapi_hook = mock.Mock()
+
+ test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1])
+ get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+ get_pandas_df_mock.return_value = test_df
+
+ operator_args = {
+ 'sql_conn_id': 'snowflake_connection',
+ 'slack_conn_id': 'slack_connection',
+ 'slack_webhook_token': 'test_token',
+ 'slack_message': 'message: {{ ds }}, {{ results_df }}',
+ 'slack_channel': '#test',
+ 'sql': "sql {{ ds }}",
+ 'dag': self.example_dag,
+ }
+ sql_to_slack_operator = self._construct_operator(**operator_args)
+
+ slack_webhook_hook = mock_slack_hook_class.return_value
+ sql_to_slack_operator._get_hook = mock_dbapi_hook
+ sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ # Test that the Slack hook is instantiated with the right parameters
+ mock_slack_hook_class.assert_called_once_with(
+ http_conn_id='slack_connection',
+ message=f'message: 2017-01-01, {test_df}',
+ channel='#test',
+ webhook_token='test_token',
+ )
+
+ # Test that the Slack hook's execute method gets run once
+ slack_webhook_hook.execute.assert_called_once()
+
+ def test_non_existing_slack_parameters_provided_exception_thrown(self):
+ operator_args = {
+ 'sql_conn_id': 'snowflake_connection',
+ 'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+ 'sql': "sql {{ ds }}",
+ }
+ with pytest.raises(AirflowException):
+ self._construct_operator(**operator_args)
+
+ @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+ def test_rendering_custom_df_name_message_execution(self, mock_slack_hook_class):
+ mock_dbapi_hook = mock.Mock()
+
+ test_df = pd.DataFrame({'a': '1', 'b': '2'}, index=[0, 1])
+ get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+ get_pandas_df_mock.return_value = test_df
+
+ operator_args = {
+ 'sql_conn_id': 'snowflake_connection',
+ 'slack_conn_id': 'slack_connection',
+ 'slack_message': 'message: {{ ds }}, {{ testing }}',
+ 'slack_channel': '#test',
+ 'sql': "sql {{ ds }}",
+ 'results_df_name': 'testing',
+ 'dag': self.example_dag,
+ }
+ sql_to_slack_operator = self._construct_operator(**operator_args)
+
+ slack_webhook_hook = mock_slack_hook_class.return_value
+ sql_to_slack_operator._get_hook = mock_dbapi_hook
+ sql_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ # Test that the Slack hook is instantiated with the right parameters
+ mock_slack_hook_class.assert_called_once_with(
+ http_conn_id='slack_connection',
+ message=f'message: 2017-01-01, {test_df}',
+ channel='#test',
+ webhook_token=None,
+ )
+
+ # Test that the Slack hook's execute method gets run once
+ slack_webhook_hook.execute.assert_called_once()
+
+ @mock.patch('airflow.operators.sql.BaseHook.get_connection')
+ def test_hook_params_building(self, mock_get_conn):
+ mock_get_conn.return_value = Connection(conn_id='snowflake_connection', conn_type='snowflake')
+ hook_params = {
+ 'schema': 'test_schema',
+ 'role': 'test_role',
+ 'database': 'test_database',
+ 'warehouse': 'test_warehouse',
+ }
+ operator_args = {
+ 'sql_conn_id': 'dummy_connection',
+ 'sql': "sql {{ ds }}",
+ 'results_df_name': 'xxxx',
+ 'sql_hook_params': hook_params,
+ 'parameters': ['1', '2', '3'],
+ 'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+ 'slack_webhook_token': 'test_token',
+ 'dag': self.example_dag,
+ }
+ sql_to_slack_operator = SqlToSlackOperator(task_id=TEST_TASK_ID, **operator_args)
+
+ assert sql_to_slack_operator.sql_hook_params == hook_params
+
+ @mock.patch('airflow.operators.sql.BaseHook.get_connection')
+ def test_hook_params(self, mock_get_conn):
+ mock_get_conn.return_value = Connection(conn_id='postgres_test', conn_type='postgres')
+ op = SqlToSlackOperator(
+ task_id='sql_hook_params',
+ sql_conn_id='postgres_test',
+ slack_webhook_token='slack_token',
+ sql="SELECT 1",
+ slack_message='message: {{ ds }}, {{ xxxx }}',
+ sql_hook_params={
+ 'schema': 'public',
+ },
+ )
+ hook = op._get_hook()
+ assert hook.schema == 'public'
+
+ @mock.patch('airflow.operators.sql.BaseHook.get_connection')
+ def test_hook_params_snowflake(self, mock_get_conn):
+ mock_get_conn.return_value = Connection(conn_id='snowflake_default', conn_type='snowflake')
+ op = SqlToSlackOperator(
+ task_id='snowflake_hook_params',
+ sql_conn_id='snowflake_default',
+ slack_conn_id='slack_default',
+ results_df_name='xxxx',
+ sql="SELECT 1",
+ slack_message='message: {{ ds }}, {{ xxxx }}',
+ sql_hook_params={
+ 'warehouse': 'warehouse',
+ 'database': 'database',
+ 'role': 'role',
+ 'schema': 'schema',
+ },
+ )
+ hook = op._get_hook()
+
+ assert hook.warehouse == 'warehouse'
+ assert hook.database == 'database'
+ assert hook.role == 'role'
+ assert hook.schema == 'schema'
diff --git a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py
index f4a28bd7cc..8973d1701f 100644
--- a/tests/providers/snowflake/transfers/test_snowflake_to_slack.py
+++ b/tests/providers/snowflake/transfers/test_snowflake_to_slack.py
@@ -20,6 +20,7 @@ from unittest import mock
from airflow.models import DAG
from airflow.providers.snowflake.transfers.snowflake_to_slack import SnowflakeToSlackOperator
from airflow.utils import timezone
+from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs
TEST_DAG_ID = 'snowflake_to_slack_unit_test'
@@ -38,15 +39,14 @@ class TestSnowflakeToSlackOperator:
@staticmethod
def _construct_operator(**kwargs):
- operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
- return operator
+ with conf_vars({('operators', 'allow_illegal_arguments'): 'True'}):
+ operator = SnowflakeToSlackOperator(task_id=TEST_DAG_ID, **kwargs)
+ return operator
- @mock.patch('airflow.providers.snowflake.transfers.snowflake_to_slack.SnowflakeHook')
- @mock.patch('airflow.providers.snowflake.transfers.snowflake_to_slack.SlackWebhookHook')
- def test_hooks_and_rendering(self, mock_slack_hook_class, mock_snowflake_hook_class):
+ @mock.patch('airflow.providers.slack.transfers.sql_to_slack.SlackWebhookHook')
+ def test_hooks_and_rendering(self, mock_slack_hook_class):
operator_args = {
'snowflake_conn_id': 'snowflake_connection',
- 'slack_conn_id': 'slack_connection',
'sql': "sql {{ ds }}",
'results_df_name': 'xxxx',
'warehouse': 'test_warehouse',
@@ -60,29 +60,75 @@ class TestSnowflakeToSlackOperator:
}
snowflake_to_slack_operator = self._construct_operator(**operator_args)
- snowflake_hook = mock_snowflake_hook_class.return_value
- snowflake_hook.get_pandas_df.return_value = '1234'
- slack_webhook_hook = mock_slack_hook_class.return_value
+ mock_dbapi_hook = mock.Mock()
+ snowflake_to_slack_operator._get_hook = mock_dbapi_hook
- snowflake_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- # Test that the Snowflake hook is instantiated with the right parameters
- mock_snowflake_hook_class.assert_called_once_with(
- database='test_database',
- role='test_role',
- schema='test_schema',
- snowflake_conn_id='snowflake_connection',
- warehouse='test_warehouse',
- )
+ get_pandas_df_mock = mock_dbapi_hook.return_value.get_pandas_df
+ get_pandas_df_mock.return_value = '1234'
- # Test that the get_pandas_df method is executed on the Snowflake hook with the pre-rendered sql and
- # correct params
- snowflake_hook.get_pandas_df.assert_called_once_with('sql 2017-01-01', parameters=['1', '2', '3'])
+ snowflake_to_slack_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
# Test that the Slack hook is instantiated with the right parameters
mock_slack_hook_class.assert_called_once_with(
- http_conn_id='slack_connection', message='message: 2017-01-01, 1234', webhook_token='test_token'
+ message='message: 2017-01-01, 1234',
+ webhook_token='test_token',
+ channel=None,
+ http_conn_id='slack_default',
)
- # Test that the Slack hook's execute method gets run once
- slack_webhook_hook.execute.assert_called_once()
+ def test_hook_params_building(self):
+ hook_params = {
+ 'schema': 'test_schema',
+ 'role': 'test_role',
+ 'database': 'test_database',
+ 'warehouse': 'test_warehouse',
+ }
+ operator_args = {
+ 'snowflake_conn_id': 'snowflake_connection',
+ 'sql': "sql {{ ds }}",
+ 'results_df_name': 'xxxx',
+ 'warehouse': hook_params['warehouse'],
+ 'database': hook_params['database'],
+ 'role': hook_params['role'],
+ 'schema': hook_params['schema'],
+ 'parameters': ['1', '2', '3'],
+ 'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+ 'slack_token': 'test_token',
+ 'dag': self.example_dag,
+ }
+ snowflake_operator = self._construct_operator(**operator_args)
+
+ assert snowflake_operator.sql_hook_params == hook_params
+
+ def test_partial_hook_params_building(self):
+ hook_params = {'role': 'test_role', 'database': 'test_database', 'warehouse': 'test_warehouse'}
+ operator_args = {
+ 'snowflake_conn_id': 'snowflake_connection',
+ 'sql': "sql {{ ds }}",
+ 'results_df_name': 'xxxx',
+ 'warehouse': hook_params['warehouse'],
+ 'database': hook_params['database'],
+ 'role': hook_params['role'],
+ 'schema': None,
+ 'parameters': ['1', '2', '3'],
+ 'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+ 'slack_token': 'test_token',
+ 'dag': self.example_dag,
+ }
+ snowflake_operator = self._construct_operator(**operator_args)
+
+ assert snowflake_operator.sql_hook_params == hook_params
+
+ def test_no_hook_params_building(self):
+ operator_args = {
+ 'snowflake_conn_id': 'snowflake_connection',
+ 'sql': "sql {{ ds }}",
+ 'results_df_name': 'xxxx',
+ 'parameters': ['1', '2', '3'],
+ 'slack_message': 'message: {{ ds }}, {{ xxxx }}',
+ 'slack_token': 'test_token',
+ 'dag': self.example_dag,
+ }
+ snowflake_operator = self._construct_operator(**operator_args)
+
+ assert snowflake_operator.sql_hook_params == {}
diff --git a/tests/system/providers/presto/example_presto_to_slack.py b/tests/system/providers/presto/example_presto_to_slack.py
index 91ab9c42e6..dc87c831b4 100644
--- a/tests/system/providers/presto/example_presto_to_slack.py
+++ b/tests/system/providers/presto/example_presto_to_slack.py
@@ -28,6 +28,7 @@ from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOper
PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table")
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_presto_to_slack"
+SLACK_CONN_WEBHOOK = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
with models.DAG(
dag_id=DAG_ID,
@@ -39,6 +40,7 @@ with models.DAG(
# [START howto_operator_presto_to_slack]
PrestoToSlackOperator(
task_id="presto_to_slack",
+ slack_token=SLACK_CONN_WEBHOOK,
sql=f"SELECT col FROM {PRESTO_TABLE}",
slack_channel="my_channel",
slack_message="message: {{ ds }}, {{ results_df }}",
diff --git a/tests/system/providers/slack/__init__.py b/tests/system/providers/slack/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/slack/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/presto/example_presto_to_slack.py b/tests/system/providers/slack/example_sql_to_slack.py
similarity index 75%
copy from tests/system/providers/presto/example_presto_to_slack.py
copy to tests/system/providers/slack/example_sql_to_slack.py
index 91ab9c42e6..afdf3bcf62 100644
--- a/tests/system/providers/presto/example_presto_to_slack.py
+++ b/tests/system/providers/slack/example_sql_to_slack.py
@@ -16,18 +16,19 @@
# specific language governing permissions and limitations
# under the License.
"""
-Example DAG using PrestoToSlackOperator.
+Example DAG using SqlToSlackOperator.
"""
import os
from datetime import datetime
from airflow import models
-from airflow.providers.presto.transfers.presto_to_slack import PrestoToSlackOperator
+from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
-PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table")
+SQL_TABLE = os.environ.get("SQL_TABLE", "test_table")
+SQL_CONN_ID = 'presto_default'
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
-DAG_ID = "example_presto_to_slack"
+DAG_ID = "example_sql_to_slack"
with models.DAG(
dag_id=DAG_ID,
@@ -36,14 +37,16 @@ with models.DAG(
catchup=False,
tags=["example"],
) as dag:
- # [START howto_operator_presto_to_slack]
- PrestoToSlackOperator(
+ # [START howto_operator_sql_to_slack]
+ SqlToSlackOperator(
task_id="presto_to_slack",
- sql=f"SELECT col FROM {PRESTO_TABLE}",
+ sql_conn_id=SQL_CONN_ID,
+ sql=f"SELECT col FROM {SQL_TABLE}",
slack_channel="my_channel",
+ slack_conn_id='slack_default',
slack_message="message: {{ ds }}, {{ results_df }}",
)
- # [END howto_operator_presto_to_slack]
+ # [END howto_operator_sql_to_slack]
from tests.system.utils import get_test_run # noqa: E402