You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/23 14:27:06 UTC

[GitHub] [airflow] josh-fell commented on a change in pull request #18447: add RedshiftStatementHook, RedshiftOperator

josh-fell commented on a change in pull request #18447:
URL: https://github.com/apache/airflow/pull/18447#discussion_r714833401



##########
File path: airflow/providers/amazon/aws/example_dags/example_redshift.py
##########
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This is an example dag for using `RedshiftOperator` to authenticate with Amazon Redshift
+using IAM authentication then executing a simple select statement
+"""
+# [START redshift_operator_howto_guide]
+from airflow import DAG
+from airflow.providers.amazon.aws.operators.redshift import RedshiftOperator
+from airflow.utils.dates import days_ago
+
+with DAG(dag_id="example_redshift", start_date=days_ago(1), schedule_interval=None, tags=['example']) as dag:
+    # [START howto_operator_s3_to_redshift_create_table]
+    setup__task_create_table = RedshiftOperator(
+        redshift_conn_id='redshift_default',

Review comment:
       You don't necessarily need to pass in the `redshift_conn_id` parameter to each `RedshiftOperator` task since the default value is being used here. 
   
   Or, if you would like to specify a non-default value for `redshift_conn_id` you could add it into the DAGs `default_args` since this arg is passed to every operator in the DAG.

##########
File path: docs/apache-airflow-providers-amazon/connections/redshift.rst
##########
@@ -0,0 +1,66 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/connection:redshift:
+
+Amazon Redshift Connection
+==========================
+
+The Redshift connection type enables integrations with Redshift.
+
+Authenticating to Amazon Redshift
+---------------------------------
+
+Authentication may be performed using any of the authentication methods supported by `redshift_connector <https://github.com/aws/amazon-redshift-python-driver>`_ such as via direct credentials, IAM authentication, or using an Identity Provider (IdP) plugin.
+
+Default Connection IDs
+-----------------------
+
+The default connection ID is ``redshift_default``.
+
+Configuring the Connection
+--------------------------
+
+
+Login
+  Specify the username to use for authentication with Amazon Redshift.
+
+Password
+  Specify the password to use for authentication with Amazon Redshift.
+
+Host
+  Specify the Amazon Redshift hostname.
+
+Extra
+    Specify the extra parameters (as json dictionary) that can be used in
+    Amazon Redshift connection. For a complete list of supported parameters
+    please see the `documentation <https://github.com/aws/amazon-redshift-python-driver#connection-parameters>`_
+    for redshift_connector. The following parameter is required:
+
+    * ``database``: Amazon Redshift database name.

Review comment:
       To be consistent with the connection field renaming in the `RedshiftStatementHook`, it might be more clear to rename "Login" to "User" as well as bring "Database" out of `Extra` and add as an explicit field.

##########
File path: tests/providers/amazon/aws/hooks/test_redshift_statement.py
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import json
+import unittest
+from unittest import mock
+
+from airflow.models import Connection
+from airflow.providers.amazon.aws.hooks.redshift_statement import RedshiftStatementHook
+
+
+class TestRedshiftStatementHookConn(unittest.TestCase):
+    def setUp(self):
+        super().setUp()
+
+        self.connection = Connection(
+            login='login', password='password', host='host', port=5439, extra=json.dumps({"database": "dev"})
+        )

Review comment:
       Same comment here as in the `RedshiftStatementHook`, if the `schema` field is going to be renamed it seems more clear to change this to: 
   ```python
   self.connection = Connection(
          login='login', password='password', host='host', port=5439, schema="dev"})
   )
   ```

##########
File path: airflow/providers/amazon/aws/hooks/redshift_statement.py
##########
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Interact with AWS Redshift, using the boto3 library."""
+
+from typing import Callable, Dict, Optional, Tuple, Union
+
+import redshift_connector
+from redshift_connector import Connection as RedshiftConnection
+
+from airflow.hooks.dbapi import DbApiHook
+
+
+class RedshiftStatementHook(DbApiHook):
+    """
+    Execute statements against Amazon Redshift, using redshift_connector
+
+    This hook requires the redshift_conn_id connection. This connection must
+    be initialized with the host, port, login, password. Additional connection
+    options can be passed to extra as a JSON string.
+
+    :param redshift_conn_id: reference to
+        :ref:`Amazon Redshift connection id<howto/connection:redshift>`
+    :type redshift_conn_id: str
+
+    .. note::
+        get_sqlalchemy_engine() and get_uri() depend on sqlalchemy-amazon-redshift
+    """
+
+    conn_name_attr = 'redshift_conn_id'
+    default_conn_name = 'redshift_default'
+    conn_type = 'redshift'
+    hook_name = 'Amazon Redshift'
+    supports_autocommit = True
+
+    @staticmethod
+    def get_ui_field_behavior() -> Dict:
+        """Returns custom field behavior"""
+        return {
+            "hidden_fields": [],
+            "relabeling": {'login': 'User', 'schema': 'Database'},
+        }
+
+    def __init__(self, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+
+    def _get_conn_params(self) -> Tuple[Dict[str, Union[str, int]], Dict]:
+        """Helper method to retrieve connection args and kwargs"""
+        conn = self.get_connection(
+            self.redshift_conn_id  # type: ignore[attr-defined]  # pylint: disable=no-member
+        )
+
+        conn_params: Dict[str, Union[str, int]] = {
+            "user": conn.login or '',
+            "password": conn.password or '',
+            "host": conn.host or '',
+            "port": conn.port or 5439,
+            "database": conn.extra_dejson.pop('database', ''),

Review comment:
       Since the `schema` field is being renamed to "Database" in the connection form, this should be:
   ```
   "database": conn.schema
   ```
   
   Otherwise, this may cause some confusion for users if there is a separate "Database" field (that is never used) but they have to input the desired database value in `Extra`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org