You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2023/07/06 06:32:50 UTC

[airflow] branch main updated: D205 Support - Providers: Snowflake to Zendesk (inclusive) (#32359)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 21e8f878a3 D205 Support - Providers: Snowflake to Zendesk (inclusive) (#32359)
21e8f878a3 is described below

commit 21e8f878a3c91250d0d198c6c3675b4b350fcb61
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Wed Jul 5 23:32:40 2023 -0700

    D205 Support - Providers: Snowflake to Zendesk (inclusive) (#32359)
---
 .../providers/snowflake/hooks/snowflake_sql_api.py | 25 ++++++++++++----------
 airflow/providers/snowflake/operators/snowflake.py | 20 ++++++++---------
 .../snowflake/transfers/copy_into_snowflake.py     |  5 +----
 .../snowflake/transfers/snowflake_to_slack.py      | 14 ++++++------
 .../snowflake/triggers/snowflake_trigger.py        |  6 +-----
 .../snowflake/utils/sql_api_generate_jwt.py        |  6 +++++-
 airflow/providers/tableau/hooks/tableau.py         |  3 +--
 airflow/providers/tableau/operators/tableau.py     |  1 +
 airflow/providers/tabular/hooks/tabular.py         |  6 ++++--
 airflow/providers/telegram/operators/telegram.py   |  1 +
 airflow/providers/trino/hooks/trino.py             |  3 +--
 11 files changed, 47 insertions(+), 43 deletions(-)

diff --git a/airflow/providers/snowflake/hooks/snowflake_sql_api.py b/airflow/providers/snowflake/hooks/snowflake_sql_api.py
index eec3c7349e..b10a3c670f 100644
--- a/airflow/providers/snowflake/hooks/snowflake_sql_api.py
+++ b/airflow/providers/snowflake/hooks/snowflake_sql_api.py
@@ -33,10 +33,10 @@ from airflow.providers.snowflake.utils.sql_api_generate_jwt import JWTGenerator
 
 class SnowflakeSqlApiHook(SnowflakeHook):
     """
-    A client to interact with Snowflake using SQL API  and allows submitting
-    multiple SQL statements in a single request. In combination with aiohttp, make post request to submit SQL
-    statements for execution, poll to check the status of the execution of a statement. Fetch query results
-    asynchronously.
+    A client to interact with Snowflake using SQL API and submit multiple SQL statements in a single request.
+
+    In combination with aiohttp, make post request to submit SQL statements for execution,
+    poll to check the status of the execution of a statement. Fetch query results asynchronously.
 
     This hook requires the snowflake_conn_id connection. This hooks mainly uses account, schema, database,
      warehouse, private_key_file or private_key_content field must be setup in the connection. Other inputs
@@ -137,7 +137,10 @@ class SnowflakeSqlApiHook(SnowflakeHook):
         conn_config = self._get_conn_params()
 
         req_id = uuid.uuid4()
-        url = f"https://{conn_config['account']}.{conn_config['region']}.snowflakecomputing.com/api/v2/statements"
+        url = (
+            f"https://{conn_config['account']}.{conn_config['region']}"
+            f".snowflakecomputing.com/api/v2/statements"
+        )
         params: dict[str, Any] | None = {"requestId": str(req_id), "async": True, "pageSize": 10}
         headers = self.get_headers()
         if bindings is None:
@@ -171,9 +174,7 @@ class SnowflakeSqlApiHook(SnowflakeHook):
         return self.query_ids
 
     def get_headers(self) -> dict[str, Any]:
-        """Based on the private key, and with connection details JWT Token is generated and header
-        is formed.
-        """
+        """Form JWT Token and header based on the private key, and connection details."""
         if not self.private_key:
             self.get_private_key()
         conn_config = self._get_conn_params()
@@ -206,13 +207,15 @@ class SnowflakeSqlApiHook(SnowflakeHook):
         req_id = uuid.uuid4()
         header = self.get_headers()
         params = {"requestId": str(req_id)}
-        url = f"https://{conn_config['account']}.{conn_config['region']}.snowflakecomputing.com/api/v2/statements/{query_id}"
+        url = (
+            f"https://{conn_config['account']}.{conn_config['region']}"
+            f".snowflakecomputing.com/api/v2/statements/{query_id}"
+        )
         return header, params, url
 
     def check_query_output(self, query_ids: list[str]) -> None:
         """
-        Based on the query ids passed as the parameter make HTTP request to snowflake SQL API and logs
-        the response.
+        Make HTTP request to snowflake SQL API based on the provided query ids and log the response.
 
         :param query_ids: statement handles query id for the individual statements.
         """
diff --git a/airflow/providers/snowflake/operators/snowflake.py b/airflow/providers/snowflake/operators/snowflake.py
index db35fa0007..4087397d83 100644
--- a/airflow/providers/snowflake/operators/snowflake.py
+++ b/airflow/providers/snowflake/operators/snowflake.py
@@ -135,10 +135,11 @@ class SnowflakeOperator(SQLExecuteQueryOperator):
 
 class SnowflakeCheckOperator(SQLCheckOperator):
     """
-    Performs a check against Snowflake. The ``SnowflakeCheckOperator`` expects
-    a sql query that will return a single row. Each value on that
-    first row is evaluated using python ``bool`` casting. If any of the
-    values return ``False`` the check is failed and errors out.
+    Performs a check against Snowflake.
+
+    The ``SnowflakeCheckOperator`` expects a sql query that will return a single row. Each
+    value on that first row is evaluated using python ``bool`` casting. If any of the values
+    return ``False`` the check is failed and errors out.
 
     Note that Python bool casting evals the following as ``False``:
 
@@ -225,8 +226,7 @@ class SnowflakeCheckOperator(SQLCheckOperator):
 
 class SnowflakeValueCheckOperator(SQLValueCheckOperator):
     """
-    Performs a simple check using sql code against a specified value, within a
-    certain level of tolerance.
+    Performs a simple check using sql code against a specified value, within a certain level of tolerance.
 
     :param sql: the sql to be executed
     :param pass_value: the value to check against
@@ -293,8 +293,7 @@ class SnowflakeValueCheckOperator(SQLValueCheckOperator):
 
 class SnowflakeIntervalCheckOperator(SQLIntervalCheckOperator):
     """
-    Checks that the values of metrics given as SQL expressions are within
-    a certain tolerance of the ones from days_back before.
+    Checks that the metrics given as SQL expressions are within tolerance of the ones from days_back before.
 
     This method constructs a query like so ::
 
@@ -479,6 +478,7 @@ class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
     def execute(self, context: Context) -> None:
         """
         Make a POST API request to snowflake by using SnowflakeSQL and execute the query to get the ids.
+
         By deferring the SnowflakeSqlApiTrigger class passed along with query ids.
         """
         self.log.info("Executing: %s", self.sql)
@@ -539,8 +539,8 @@ class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
     def execute_complete(self, context: Context, event: dict[str, str | list[str]] | None = None) -> None:
         """
         Callback for when the trigger fires - returns immediately.
-        Relies on trigger to throw an exception, otherwise it assumes execution was
-        successful.
+
+        Relies on trigger to throw an exception, otherwise it assumes execution was successful.
         """
         if event:
             if "status" in event and event["status"] == "error":
diff --git a/airflow/providers/snowflake/transfers/copy_into_snowflake.py b/airflow/providers/snowflake/transfers/copy_into_snowflake.py
index 6e1d22cd2c..10071add1a 100644
--- a/airflow/providers/snowflake/transfers/copy_into_snowflake.py
+++ b/airflow/providers/snowflake/transfers/copy_into_snowflake.py
@@ -15,10 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-This module contains abstract operator that child classes
-implement "COPY INTO <TABLE> SQL in Snowflake".
-"""
+"""Abstract operator that child classes implement ``COPY INTO <TABLE> SQL in Snowflake``."""
 from __future__ import annotations
 
 from typing import Any, Sequence
diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/snowflake/transfers/snowflake_to_slack.py
index b4b441a585..50d38b9471 100644
--- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py
+++ b/airflow/providers/snowflake/transfers/snowflake_to_slack.py
@@ -25,12 +25,14 @@ from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
 
 class SnowflakeToSlackOperator(SqlToSlackOperator):
     """
-    Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are
-    rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
-    results_df }}'. The 'results_df' variable name can be changed by specifying a different
-    'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to
-    allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df |
-    tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table.
+    Executes an SQL statement in Snowflake and sends the results to Slack.
+
+    The results of the query are rendered into the 'slack_message' parameter as a Pandas dataframe
+    using a Jinja variable called '{{ results_df }}'. The 'results_df' variable name can be changed
+    by specifying a different 'results_df_name' parameter. The Tabulate library is added to the
+    Jinja environment as a filter to allow the dataframe to be rendered nicely. For example, set
+    'slack_message' to {{ results_df | tabulate(tablefmt="pretty", headers="keys") }} to send the
+    results to Slack as an ASCII rendered table.
 
     .. seealso::
         For more information on how to use this operator, take a look at the guide:
diff --git a/airflow/providers/snowflake/triggers/snowflake_trigger.py b/airflow/providers/snowflake/triggers/snowflake_trigger.py
index 4f1e0cffb2..bb426fa3d0 100644
--- a/airflow/providers/snowflake/triggers/snowflake_trigger.py
+++ b/airflow/providers/snowflake/triggers/snowflake_trigger.py
@@ -93,11 +93,7 @@ class SnowflakeSqlApiTrigger(BaseTrigger):
             yield TriggerEvent({"status": "error", "message": str(e)})
 
     async def get_query_status(self, query_id: str) -> dict[str, Any]:
-        """
-        Async function to check whether the query statement submitted via SQL API is still
-        running state and returns True if it is still running else
-        return False.
-        """
+        """Return True if the SQL query is still running otherwise return False."""
         hook = SnowflakeSqlApiHook(
             self.snowflake_conn_id,
             self.token_life_time,
diff --git a/airflow/providers/snowflake/utils/sql_api_generate_jwt.py b/airflow/providers/snowflake/utils/sql_api_generate_jwt.py
index 883ef3aae0..553d84caf7 100644
--- a/airflow/providers/snowflake/utils/sql_api_generate_jwt.py
+++ b/airflow/providers/snowflake/utils/sql_api_generate_jwt.py
@@ -42,6 +42,7 @@ SUBJECT = "sub"
 class JWTGenerator:
     """
     Creates and signs a JWT with the specified private key file, username, and account identifier.
+
     The JWTGenerator keeps the generated token and only regenerates the token if a specified period
     of time has passed.
 
@@ -92,6 +93,7 @@ class JWTGenerator:
     def prepare_account_name_for_jwt(self, raw_account: str) -> str:
         """
         Prepare the account identifier for use in the JWT.
+
         For the JWT, the account identifier must not include the subdomain or any region or cloud provider
         information.
 
@@ -113,7 +115,9 @@ class JWTGenerator:
 
     def get_token(self) -> str | None:
         """
-        Generates a new JWT. If a JWT has been already been generated earlier, return the previously
+        Generates a new JWT.
+
+        If a JWT has been already been generated earlier, return the previously
         generated token unless the specified renewal time has passed.
         """
         now = datetime.now(timezone.utc)  # Fetch the current time
diff --git a/airflow/providers/tableau/hooks/tableau.py b/airflow/providers/tableau/hooks/tableau.py
index d801ca97a2..5127a3dc9a 100644
--- a/airflow/providers/tableau/hooks/tableau.py
+++ b/airflow/providers/tableau/hooks/tableau.py
@@ -162,8 +162,7 @@ class TableauHook(BaseHook):
 
     def wait_for_state(self, job_id: str, target_state: TableauJobFinishCode, check_interval: float) -> bool:
         """
-        Wait until the current state of a defined Tableau Job is equal
-        to target_state or different from PENDING.
+        Wait until the current state of a defined Tableau Job is target_state or different from PENDING.
 
         :param job_id: The id of the job to check.
         :param target_state: Enum that describe the Tableau job's target state
diff --git a/airflow/providers/tableau/operators/tableau.py b/airflow/providers/tableau/operators/tableau.py
index 5e13c2d6f4..318744835e 100644
--- a/airflow/providers/tableau/operators/tableau.py
+++ b/airflow/providers/tableau/operators/tableau.py
@@ -95,6 +95,7 @@ class TableauOperator(BaseOperator):
     def execute(self, context: Context) -> str:
         """
         Executes the Tableau API resource and pushes the job id or downloaded file URI to xcom.
+
         :param context: The task context during execution.
         :return: the id of the job that executes the extract refresh or downloaded file URI.
         """
diff --git a/airflow/providers/tabular/hooks/tabular.py b/airflow/providers/tabular/hooks/tabular.py
index 5fed3888f9..da99cfbb9f 100644
--- a/airflow/providers/tabular/hooks/tabular.py
+++ b/airflow/providers/tabular/hooks/tabular.py
@@ -30,8 +30,10 @@ TOKENS_ENDPOINT = "oauth/tokens"
 
 class TabularHook(BaseHook):
     """
-    This hook acts as a base hook for tabular services. It offers the ability to generate temporary,
-    short-lived session tokens to use within Airflow submitted jobs.
+    This hook acts as a base hook for tabular services.
+
+    It offers the ability to generate temporary, short-lived
+    session tokens to use within Airflow submitted jobs.
 
     :param tabular_conn_id: The :ref:`Tabular connection id<howto/connection:tabular>`
         which refers to the information to connect to the Tabular OAuth service.
diff --git a/airflow/providers/telegram/operators/telegram.py b/airflow/providers/telegram/operators/telegram.py
index 840011ce0b..4c85487e2c 100644
--- a/airflow/providers/telegram/operators/telegram.py
+++ b/airflow/providers/telegram/operators/telegram.py
@@ -31,6 +31,7 @@ if TYPE_CHECKING:
 class TelegramOperator(BaseOperator):
     """
     This operator allows you to post messages to Telegram using Telegram Bot API.
+
     Takes both Telegram Bot API token directly or connection that has Telegram token in password field.
     If both supplied, token parameter will be given precedence.
 
diff --git a/airflow/providers/trino/hooks/trino.py b/airflow/providers/trino/hooks/trino.py
index 09fbe6efa6..67ee432ca3 100644
--- a/airflow/providers/trino/hooks/trino.py
+++ b/airflow/providers/trino/hooks/trino.py
@@ -240,8 +240,7 @@ class TrinoHook(DbApiHook):
     @staticmethod
     def _serialize_cell(cell: Any, conn: Connection | None = None) -> Any:
         """
-        Trino will adapt all arguments to the execute() method internally,
-        hence we return cell without any conversion.
+        Trino will adapt all execute() args internally, hence we return cell without any conversion.
 
         :param cell: The cell to insert into the table
         :param conn: The database connection