You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2023/07/06 06:32:50 UTC
[airflow] branch main updated: D205 Support - Providers: Snowflake to Zendesk (inclusive) (#32359)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 21e8f878a3 D205 Support - Providers: Snowflake to Zendesk (inclusive) (#32359)
21e8f878a3 is described below
commit 21e8f878a3c91250d0d198c6c3675b4b350fcb61
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Wed Jul 5 23:32:40 2023 -0700
D205 Support - Providers: Snowflake to Zendesk (inclusive) (#32359)
---
.../providers/snowflake/hooks/snowflake_sql_api.py | 25 ++++++++++++----------
airflow/providers/snowflake/operators/snowflake.py | 20 ++++++++---------
.../snowflake/transfers/copy_into_snowflake.py | 5 +----
.../snowflake/transfers/snowflake_to_slack.py | 14 ++++++------
.../snowflake/triggers/snowflake_trigger.py | 6 +-----
.../snowflake/utils/sql_api_generate_jwt.py | 6 +++++-
airflow/providers/tableau/hooks/tableau.py | 3 +--
airflow/providers/tableau/operators/tableau.py | 1 +
airflow/providers/tabular/hooks/tabular.py | 6 ++++--
airflow/providers/telegram/operators/telegram.py | 1 +
airflow/providers/trino/hooks/trino.py | 3 +--
11 files changed, 47 insertions(+), 43 deletions(-)
diff --git a/airflow/providers/snowflake/hooks/snowflake_sql_api.py b/airflow/providers/snowflake/hooks/snowflake_sql_api.py
index eec3c7349e..b10a3c670f 100644
--- a/airflow/providers/snowflake/hooks/snowflake_sql_api.py
+++ b/airflow/providers/snowflake/hooks/snowflake_sql_api.py
@@ -33,10 +33,10 @@ from airflow.providers.snowflake.utils.sql_api_generate_jwt import JWTGenerator
class SnowflakeSqlApiHook(SnowflakeHook):
"""
- A client to interact with Snowflake using SQL API and allows submitting
- multiple SQL statements in a single request. In combination with aiohttp, make post request to submit SQL
- statements for execution, poll to check the status of the execution of a statement. Fetch query results
- asynchronously.
+ A client to interact with Snowflake using SQL API and submit multiple SQL statements in a single request.
+
+ In combination with aiohttp, make post request to submit SQL statements for execution,
+ poll to check the status of the execution of a statement. Fetch query results asynchronously.
This hook requires the snowflake_conn_id connection. This hooks mainly uses account, schema, database,
warehouse, private_key_file or private_key_content field must be setup in the connection. Other inputs
@@ -137,7 +137,10 @@ class SnowflakeSqlApiHook(SnowflakeHook):
conn_config = self._get_conn_params()
req_id = uuid.uuid4()
- url = f"https://{conn_config['account']}.{conn_config['region']}.snowflakecomputing.com/api/v2/statements"
+ url = (
+ f"https://{conn_config['account']}.{conn_config['region']}"
+ f".snowflakecomputing.com/api/v2/statements"
+ )
params: dict[str, Any] | None = {"requestId": str(req_id), "async": True, "pageSize": 10}
headers = self.get_headers()
if bindings is None:
@@ -171,9 +174,7 @@ class SnowflakeSqlApiHook(SnowflakeHook):
return self.query_ids
def get_headers(self) -> dict[str, Any]:
- """Based on the private key, and with connection details JWT Token is generated and header
- is formed.
- """
+ """Form JWT Token and header based on the private key, and connection details."""
if not self.private_key:
self.get_private_key()
conn_config = self._get_conn_params()
@@ -206,13 +207,15 @@ class SnowflakeSqlApiHook(SnowflakeHook):
req_id = uuid.uuid4()
header = self.get_headers()
params = {"requestId": str(req_id)}
- url = f"https://{conn_config['account']}.{conn_config['region']}.snowflakecomputing.com/api/v2/statements/{query_id}"
+ url = (
+ f"https://{conn_config['account']}.{conn_config['region']}"
+ f".snowflakecomputing.com/api/v2/statements/{query_id}"
+ )
return header, params, url
def check_query_output(self, query_ids: list[str]) -> None:
"""
- Based on the query ids passed as the parameter make HTTP request to snowflake SQL API and logs
- the response.
+ Make HTTP request to snowflake SQL API based on the provided query ids and log the response.
:param query_ids: statement handles query id for the individual statements.
"""
diff --git a/airflow/providers/snowflake/operators/snowflake.py b/airflow/providers/snowflake/operators/snowflake.py
index db35fa0007..4087397d83 100644
--- a/airflow/providers/snowflake/operators/snowflake.py
+++ b/airflow/providers/snowflake/operators/snowflake.py
@@ -135,10 +135,11 @@ class SnowflakeOperator(SQLExecuteQueryOperator):
class SnowflakeCheckOperator(SQLCheckOperator):
"""
- Performs a check against Snowflake. The ``SnowflakeCheckOperator`` expects
- a sql query that will return a single row. Each value on that
- first row is evaluated using python ``bool`` casting. If any of the
- values return ``False`` the check is failed and errors out.
+ Performs a check against Snowflake.
+
+ The ``SnowflakeCheckOperator`` expects a sql query that will return a single row. Each
+ value on that first row is evaluated using python ``bool`` casting. If any of the values
+ return ``False`` the check is failed and errors out.
Note that Python bool casting evals the following as ``False``:
@@ -225,8 +226,7 @@ class SnowflakeCheckOperator(SQLCheckOperator):
class SnowflakeValueCheckOperator(SQLValueCheckOperator):
"""
- Performs a simple check using sql code against a specified value, within a
- certain level of tolerance.
+ Performs a simple check using sql code against a specified value, within a certain level of tolerance.
:param sql: the sql to be executed
:param pass_value: the value to check against
@@ -293,8 +293,7 @@ class SnowflakeValueCheckOperator(SQLValueCheckOperator):
class SnowflakeIntervalCheckOperator(SQLIntervalCheckOperator):
"""
- Checks that the values of metrics given as SQL expressions are within
- a certain tolerance of the ones from days_back before.
+ Checks that the metrics given as SQL expressions are within tolerance of the ones from days_back before.
This method constructs a query like so ::
@@ -479,6 +478,7 @@ class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
def execute(self, context: Context) -> None:
"""
Make a POST API request to snowflake by using SnowflakeSQL and execute the query to get the ids.
+
By deferring the SnowflakeSqlApiTrigger class passed along with query ids.
"""
self.log.info("Executing: %s", self.sql)
@@ -539,8 +539,8 @@ class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
def execute_complete(self, context: Context, event: dict[str, str | list[str]] | None = None) -> None:
"""
Callback for when the trigger fires - returns immediately.
- Relies on trigger to throw an exception, otherwise it assumes execution was
- successful.
+
+ Relies on trigger to throw an exception, otherwise it assumes execution was successful.
"""
if event:
if "status" in event and event["status"] == "error":
diff --git a/airflow/providers/snowflake/transfers/copy_into_snowflake.py b/airflow/providers/snowflake/transfers/copy_into_snowflake.py
index 6e1d22cd2c..10071add1a 100644
--- a/airflow/providers/snowflake/transfers/copy_into_snowflake.py
+++ b/airflow/providers/snowflake/transfers/copy_into_snowflake.py
@@ -15,10 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-This module contains abstract operator that child classes
-implement "COPY INTO <TABLE> SQL in Snowflake".
-"""
+"""Abstract operator that child classes implement ``COPY INTO <TABLE> SQL in Snowflake``."""
from __future__ import annotations
from typing import Any, Sequence
diff --git a/airflow/providers/snowflake/transfers/snowflake_to_slack.py b/airflow/providers/snowflake/transfers/snowflake_to_slack.py
index b4b441a585..50d38b9471 100644
--- a/airflow/providers/snowflake/transfers/snowflake_to_slack.py
+++ b/airflow/providers/snowflake/transfers/snowflake_to_slack.py
@@ -25,12 +25,14 @@ from airflow.providers.slack.transfers.sql_to_slack import SqlToSlackOperator
class SnowflakeToSlackOperator(SqlToSlackOperator):
"""
- Executes an SQL statement in Snowflake and sends the results to Slack. The results of the query are
- rendered into the 'slack_message' parameter as a Pandas dataframe using a JINJA variable called '{{
- results_df }}'. The 'results_df' variable name can be changed by specifying a different
- 'results_df_name' parameter. The Tabulate library is added to the JINJA environment as a filter to
- allow the dataframe to be rendered nicely. For example, set 'slack_message' to {{ results_df |
- tabulate(tablefmt="pretty", headers="keys") }} to send the results to Slack as an ascii rendered table.
+ Executes an SQL statement in Snowflake and sends the results to Slack.
+
+ The results of the query are rendered into the 'slack_message' parameter as a Pandas dataframe
+ using a Jinja variable called '{{ results_df }}'. The 'results_df' variable name can be changed
+ by specifying a different 'results_df_name' parameter. The Tabulate library is added to the
+ Jinja environment as a filter to allow the dataframe to be rendered nicely. For example, set
+ 'slack_message' to {{ results_df | tabulate(tablefmt="pretty", headers="keys") }} to send the
+ results to Slack as an ASCII rendered table.
.. seealso::
For more information on how to use this operator, take a look at the guide:
diff --git a/airflow/providers/snowflake/triggers/snowflake_trigger.py b/airflow/providers/snowflake/triggers/snowflake_trigger.py
index 4f1e0cffb2..bb426fa3d0 100644
--- a/airflow/providers/snowflake/triggers/snowflake_trigger.py
+++ b/airflow/providers/snowflake/triggers/snowflake_trigger.py
@@ -93,11 +93,7 @@ class SnowflakeSqlApiTrigger(BaseTrigger):
yield TriggerEvent({"status": "error", "message": str(e)})
async def get_query_status(self, query_id: str) -> dict[str, Any]:
- """
- Async function to check whether the query statement submitted via SQL API is still
- running state and returns True if it is still running else
- return False.
- """
+ """Return True if the SQL query is still running otherwise return False."""
hook = SnowflakeSqlApiHook(
self.snowflake_conn_id,
self.token_life_time,
diff --git a/airflow/providers/snowflake/utils/sql_api_generate_jwt.py b/airflow/providers/snowflake/utils/sql_api_generate_jwt.py
index 883ef3aae0..553d84caf7 100644
--- a/airflow/providers/snowflake/utils/sql_api_generate_jwt.py
+++ b/airflow/providers/snowflake/utils/sql_api_generate_jwt.py
@@ -42,6 +42,7 @@ SUBJECT = "sub"
class JWTGenerator:
"""
Creates and signs a JWT with the specified private key file, username, and account identifier.
+
The JWTGenerator keeps the generated token and only regenerates the token if a specified period
of time has passed.
@@ -92,6 +93,7 @@ class JWTGenerator:
def prepare_account_name_for_jwt(self, raw_account: str) -> str:
"""
Prepare the account identifier for use in the JWT.
+
For the JWT, the account identifier must not include the subdomain or any region or cloud provider
information.
@@ -113,7 +115,9 @@ class JWTGenerator:
def get_token(self) -> str | None:
"""
- Generates a new JWT. If a JWT has been already been generated earlier, return the previously
+ Generates a new JWT.
+
+ If a JWT has been already been generated earlier, return the previously
generated token unless the specified renewal time has passed.
"""
now = datetime.now(timezone.utc) # Fetch the current time
diff --git a/airflow/providers/tableau/hooks/tableau.py b/airflow/providers/tableau/hooks/tableau.py
index d801ca97a2..5127a3dc9a 100644
--- a/airflow/providers/tableau/hooks/tableau.py
+++ b/airflow/providers/tableau/hooks/tableau.py
@@ -162,8 +162,7 @@ class TableauHook(BaseHook):
def wait_for_state(self, job_id: str, target_state: TableauJobFinishCode, check_interval: float) -> bool:
"""
- Wait until the current state of a defined Tableau Job is equal
- to target_state or different from PENDING.
+ Wait until the current state of a defined Tableau Job is target_state or different from PENDING.
:param job_id: The id of the job to check.
:param target_state: Enum that describe the Tableau job's target state
diff --git a/airflow/providers/tableau/operators/tableau.py b/airflow/providers/tableau/operators/tableau.py
index 5e13c2d6f4..318744835e 100644
--- a/airflow/providers/tableau/operators/tableau.py
+++ b/airflow/providers/tableau/operators/tableau.py
@@ -95,6 +95,7 @@ class TableauOperator(BaseOperator):
def execute(self, context: Context) -> str:
"""
Executes the Tableau API resource and pushes the job id or downloaded file URI to xcom.
+
:param context: The task context during execution.
:return: the id of the job that executes the extract refresh or downloaded file URI.
"""
diff --git a/airflow/providers/tabular/hooks/tabular.py b/airflow/providers/tabular/hooks/tabular.py
index 5fed3888f9..da99cfbb9f 100644
--- a/airflow/providers/tabular/hooks/tabular.py
+++ b/airflow/providers/tabular/hooks/tabular.py
@@ -30,8 +30,10 @@ TOKENS_ENDPOINT = "oauth/tokens"
class TabularHook(BaseHook):
"""
- This hook acts as a base hook for tabular services. It offers the ability to generate temporary,
- short-lived session tokens to use within Airflow submitted jobs.
+ This hook acts as a base hook for tabular services.
+
+ It offers the ability to generate temporary, short-lived
+ session tokens to use within Airflow submitted jobs.
:param tabular_conn_id: The :ref:`Tabular connection id<howto/connection:tabular>`
which refers to the information to connect to the Tabular OAuth service.
diff --git a/airflow/providers/telegram/operators/telegram.py b/airflow/providers/telegram/operators/telegram.py
index 840011ce0b..4c85487e2c 100644
--- a/airflow/providers/telegram/operators/telegram.py
+++ b/airflow/providers/telegram/operators/telegram.py
@@ -31,6 +31,7 @@ if TYPE_CHECKING:
class TelegramOperator(BaseOperator):
"""
This operator allows you to post messages to Telegram using Telegram Bot API.
+
Takes both Telegram Bot API token directly or connection that has Telegram token in password field.
If both supplied, token parameter will be given precedence.
diff --git a/airflow/providers/trino/hooks/trino.py b/airflow/providers/trino/hooks/trino.py
index 09fbe6efa6..67ee432ca3 100644
--- a/airflow/providers/trino/hooks/trino.py
+++ b/airflow/providers/trino/hooks/trino.py
@@ -240,8 +240,7 @@ class TrinoHook(DbApiHook):
@staticmethod
def _serialize_cell(cell: Any, conn: Connection | None = None) -> Any:
"""
- Trino will adapt all arguments to the execute() method internally,
- hence we return cell without any conversion.
+ Trino will adapt all execute() args internally, hence we return cell without any conversion.
:param cell: The cell to insert into the table
:param conn: The database connection