You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/24 07:26:07 UTC

[GitHub] [airflow] dstandish commented on a change in pull request #19137: Add RedshiftDataHook

dstandish commented on a change in pull request #19137:
URL: https://github.com/apache/airflow/pull/19137#discussion_r813599714



##########
File path: airflow/providers/amazon/aws/operators/redshift_data.py
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from time import sleep
+from typing import Optional
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
+
+
+class RedshiftDataOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster using Redshift Data
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftDataOperator`
+
+    :param cluster_identifier: unique identifier of a cluster
+    :param database: the name of the database
+    :param sql: the SQL statement text to run
+    :param db_user: the database user name
+    :param parameters: the parameters for the SQL statement
+    :param secret_arn: the name or ARN of the secret that enables db access
+    :param statement_name: the name of the SQL statement
+    :param with_event: indicates whether to send an event to EventBridge
+    :param timeout: how long in seconds to wait for a response, if 0 don't wait
+    :param poll_interval: how often in seconds to check the query status
+    """
+
+    template_fields = (
+        'cluster_identifier',
+        'database',
+        'sql',
+        'db_user',
+        'parameters',
+    )
+    template_ext = ('.sql',)
+    template_fields_renderers = {'sql': 'sql'}
+
+    def __init__(
+        self,
+        database: str,
+        sql: str,
+        cluster_identifier: Optional[str] = "",

Review comment:
       we generally use `None` for default value for optionals

##########
File path: airflow/providers/amazon/aws/hooks/redshift_data.py
##########
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Interact with AWS Redshift clusters."""
+
+from typing import Any, Dict, Optional
+
+from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
+
+
+class RedshiftDataHook(AwsBaseHook):
+    """
+    Interact with AWS Redshift Data, using the boto3 library
+
+    Additional arguments (such as ``aws_conn_id``) may be specified and
+    are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    """
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs["client_type"] = "redshift-data"
+        super().__init__(*args, **kwargs)
+
+    def execute_statement(

Review comment:
       I would suggest that you simplify this hook to remove the methods which are essentially an alias for the boto3 client methods. Such as this one.
   
   Instead, just use the underlying client directly in your operator, and add the library https://pypi.org/project/mypy-boto3-redshift-data/ and type hint the client object so that users still get nice autocomplete.
   
   We are doing the exact same thing here: https://github.com/apache/airflow/pull/20642/files#diff-07892cfb7f2a919afec9185e36b417d75d5322d243096667ca1cf05c506d9533R51-R59
   
   Implementing this kind of "alias" method doesn't really add any value but does introduce a maintenance burden.

##########
File path: airflow/providers/amazon/aws/operators/redshift_data.py
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from time import sleep
+from typing import Optional
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
+
+
+class RedshiftDataOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster using Redshift Data
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftDataOperator`
+
+    :param cluster_identifier: unique identifier of a cluster
+    :param database: the name of the database
+    :param sql: the SQL statement text to run
+    :param db_user: the database user name
+    :param parameters: the parameters for the SQL statement
+    :param secret_arn: the name or ARN of the secret that enables db access
+    :param statement_name: the name of the SQL statement
+    :param with_event: indicates whether to send an event to EventBridge
+    :param timeout: how long in seconds to wait for a response, if 0 don't wait
+    :param poll_interval: how often in seconds to check the query status
+    """
+
+    template_fields = (
+        'cluster_identifier',
+        'database',
+        'sql',
+        'db_user',
+        'parameters',
+    )
+    template_ext = ('.sql',)
+    template_fields_renderers = {'sql': 'sql'}
+
+    def __init__(
+        self,
+        database: str,
+        sql: str,
+        cluster_identifier: Optional[str] = "",
+        db_user: Optional[str] = "",
+        parameters: Optional[list] = None,
+        secret_arn: Optional[str] = "",
+        statement_name: Optional[str] = "",
+        with_event: bool = False,
+        timeout: int = 300,
+        poll_interval: int = 10,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.aws_conn_id = aws_conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.cluster_identifier = cluster_identifier
+        self.database = database
+        self.db_user = db_user
+        self.secret_arn = secret_arn
+        self.statement_name = statement_name
+        self.with_event = with_event
+        self.timeout = timeout
+        self.statement_id = None
+        if poll_interval > 0:
+            self.poll_interval = poll_interval
+        else:
+            self.log.warning(
+                "Invalid poll_interval:",
+                poll_interval,
+            )
+
+    @cached_property
+    def hook(self) -> RedshiftDataHook:
+        """Create and return an RedshiftDataHook."""
+        return RedshiftDataHook(aws_conn_id=self.aws_conn_id)
+
+    def execute_query(self):
+        resp = self.hook.execute_statement(
+            cluster_identifier=self.cluster_identifier,
+            database=self.database,
+            db_user=self.db_user,
+            sql=self.sql,
+            parameters=self.parameters,
+            secret_arn=self.secret_arn,
+            statement_name=self.statement_name,
+            with_event=self.with_event,
+        )
+        return resp['Id']
+
+    def wait_for_results(self, statement_id):
+        elapsed = 0
+        while elapsed < self.timeout:
+            self.log.info("Polling", statement_id)

Review comment:
       ```suggestion
               self.log.info("Polling statement %r", statement_id)
   ```

##########
File path: airflow/providers/amazon/aws/operators/redshift_data.py
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from time import sleep
+from typing import Optional
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
+
+
+class RedshiftDataOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster using Redshift Data
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftDataOperator`
+
+    :param cluster_identifier: unique identifier of a cluster
+    :param database: the name of the database
+    :param sql: the SQL statement text to run
+    :param db_user: the database user name
+    :param parameters: the parameters for the SQL statement
+    :param secret_arn: the name or ARN of the secret that enables db access
+    :param statement_name: the name of the SQL statement
+    :param with_event: indicates whether to send an event to EventBridge
+    :param timeout: how long in seconds to wait for a response, if 0 don't wait
+    :param poll_interval: how often in seconds to check the query status
+    """
+
+    template_fields = (
+        'cluster_identifier',
+        'database',
+        'sql',
+        'db_user',
+        'parameters',
+    )
+    template_ext = ('.sql',)
+    template_fields_renderers = {'sql': 'sql'}
+
+    def __init__(
+        self,
+        database: str,
+        sql: str,
+        cluster_identifier: Optional[str] = "",
+        db_user: Optional[str] = "",
+        parameters: Optional[list] = None,
+        secret_arn: Optional[str] = "",
+        statement_name: Optional[str] = "",
+        with_event: bool = False,
+        timeout: int = 300,
+        poll_interval: int = 10,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.aws_conn_id = aws_conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.cluster_identifier = cluster_identifier
+        self.database = database
+        self.db_user = db_user
+        self.secret_arn = secret_arn
+        self.statement_name = statement_name
+        self.with_event = with_event
+        self.timeout = timeout
+        self.statement_id = None
+        if poll_interval > 0:
+            self.poll_interval = poll_interval
+        else:
+            self.log.warning(
+                "Invalid poll_interval:",
+                poll_interval,
+            )
+
+    @cached_property
+    def hook(self) -> RedshiftDataHook:
+        """Create and return an RedshiftDataHook."""
+        return RedshiftDataHook(aws_conn_id=self.aws_conn_id)
+
+    def execute_query(self):
+        resp = self.hook.execute_statement(
+            cluster_identifier=self.cluster_identifier,
+            database=self.database,
+            db_user=self.db_user,
+            sql=self.sql,
+            parameters=self.parameters,
+            secret_arn=self.secret_arn,
+            statement_name=self.statement_name,
+            with_event=self.with_event,
+        )
+        return resp['Id']
+
+    def wait_for_results(self, statement_id):
+        elapsed = 0
+        while elapsed < self.timeout:
+            self.log.info("Polling", statement_id)
+            resp = self.hook.describe_statement(
+                statement_id=statement_id,
+            )
+            status = resp['Status']
+            if status == 'FINISHED':
+                return status
+            elif status == 'FAILED':
+                raise ValueError(f"RedshiftDataHook.describe_statement {status}")

Review comment:
       ```suggestion
                   raise ValueError(f"Statement {statement_id!r} terminated with status {STATUS}.")
   ```

##########
File path: tests/providers/amazon/aws/hooks/test_redshift_data.py
##########
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+from unittest import mock
+
+from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
+
+

Review comment:
       this is a very simple hook / operator and i think using moto may be overcomplicating it. we can just write simple unit tests and conventional mocking techniques.  there are very few moving parts. basically we should just need to check that 1. execute_statement should be called with the operator params and 2. execution statuses should be handled properly, 3 nowait behavior, 4 xcom behavior. if moto is still holding you back i would not worry about it.

##########
File path: airflow/providers/amazon/aws/operators/redshift_data.py
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from time import sleep
+from typing import Optional
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
+
+
+class RedshiftDataOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster using Redshift Data
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftDataOperator`
+
+    :param cluster_identifier: unique identifier of a cluster
+    :param database: the name of the database
+    :param sql: the SQL statement text to run
+    :param db_user: the database user name
+    :param parameters: the parameters for the SQL statement
+    :param secret_arn: the name or ARN of the secret that enables db access
+    :param statement_name: the name of the SQL statement
+    :param with_event: indicates whether to send an event to EventBridge
+    :param timeout: how long in seconds to wait for a response, if 0 don't wait
+    :param poll_interval: how often in seconds to check the query status
+    """
+
+    template_fields = (
+        'cluster_identifier',
+        'database',
+        'sql',
+        'db_user',
+        'parameters',
+    )
+    template_ext = ('.sql',)
+    template_fields_renderers = {'sql': 'sql'}
+
+    def __init__(
+        self,
+        database: str,
+        sql: str,
+        cluster_identifier: Optional[str] = "",
+        db_user: Optional[str] = "",
+        parameters: Optional[list] = None,
+        secret_arn: Optional[str] = "",
+        statement_name: Optional[str] = "",
+        with_event: bool = False,
+        timeout: int = 300,
+        poll_interval: int = 10,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.aws_conn_id = aws_conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.cluster_identifier = cluster_identifier
+        self.database = database
+        self.db_user = db_user
+        self.secret_arn = secret_arn
+        self.statement_name = statement_name
+        self.with_event = with_event
+        self.timeout = timeout
+        self.statement_id = None
+        if poll_interval > 0:
+            self.poll_interval = poll_interval
+        else:
+            self.log.warning(
+                "Invalid poll_interval:",
+                poll_interval,
+            )
+
+    @cached_property
+    def hook(self) -> RedshiftDataHook:
+        """Create and return an RedshiftDataHook."""
+        return RedshiftDataHook(aws_conn_id=self.aws_conn_id)
+
+    def execute_query(self):
+        resp = self.hook.execute_statement(
+            cluster_identifier=self.cluster_identifier,
+            database=self.database,
+            db_user=self.db_user,
+            sql=self.sql,
+            parameters=self.parameters,
+            secret_arn=self.secret_arn,
+            statement_name=self.statement_name,
+            with_event=self.with_event,
+        )
+        return resp['Id']
+
+    def wait_for_results(self, statement_id):
+        elapsed = 0
+        while elapsed < self.timeout:
+            self.log.info("Polling", statement_id)
+            resp = self.hook.describe_statement(
+                statement_id=statement_id,
+            )
+            status = resp['Status']
+            if status == 'FINISHED':
+                return status
+            elif status == 'FAILED':
+                raise ValueError(f"RedshiftDataHook.describe_statement {status}")
+            elif status == 'ABORTED':
+                raise ValueError(f"Query {status}")

Review comment:
       ```suggestion
                   raise ValueError(f"Statement {statement_id!r} terminated with status {STATUS}.")
   ```

##########
File path: airflow/providers/amazon/aws/operators/redshift_data.py
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from time import sleep
+from typing import Optional
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
+
+
+class RedshiftDataOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster using Redshift Data
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftDataOperator`
+
+    :param cluster_identifier: unique identifier of a cluster
+    :param database: the name of the database
+    :param sql: the SQL statement text to run
+    :param db_user: the database user name
+    :param parameters: the parameters for the SQL statement
+    :param secret_arn: the name or ARN of the secret that enables db access
+    :param statement_name: the name of the SQL statement
+    :param with_event: indicates whether to send an event to EventBridge
+    :param timeout: how long in seconds to wait for a response, if 0 don't wait
+    :param poll_interval: how often in seconds to check the query status
+    """
+
+    template_fields = (
+        'cluster_identifier',
+        'database',
+        'sql',
+        'db_user',
+        'parameters',
+    )
+    template_ext = ('.sql',)
+    template_fields_renderers = {'sql': 'sql'}
+
+    def __init__(
+        self,
+        database: str,
+        sql: str,
+        cluster_identifier: Optional[str] = "",
+        db_user: Optional[str] = "",
+        parameters: Optional[list] = None,
+        secret_arn: Optional[str] = "",
+        statement_name: Optional[str] = "",
+        with_event: bool = False,
+        timeout: int = 300,

Review comment:
       tasks already have an `execution_timeout`
   
   why add another one?

##########
File path: airflow/providers/amazon/aws/operators/redshift_data.py
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from time import sleep
+from typing import Optional
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
+
+
+class RedshiftDataOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster using Redshift Data
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftDataOperator`
+
+    :param cluster_identifier: unique identifier of a cluster
+    :param database: the name of the database
+    :param sql: the SQL statement text to run
+    :param db_user: the database user name
+    :param parameters: the parameters for the SQL statement
+    :param secret_arn: the name or ARN of the secret that enables db access
+    :param statement_name: the name of the SQL statement
+    :param with_event: indicates whether to send an event to EventBridge
+    :param timeout: how long in seconds to wait for a response, if 0 don't wait
+    :param poll_interval: how often in seconds to check the query status
+    """
+
+    template_fields = (
+        'cluster_identifier',
+        'database',
+        'sql',
+        'db_user',
+        'parameters',
+    )
+    template_ext = ('.sql',)
+    template_fields_renderers = {'sql': 'sql'}
+
+    def __init__(
+        self,
+        database: str,
+        sql: str,
+        cluster_identifier: Optional[str] = "",
+        db_user: Optional[str] = "",
+        parameters: Optional[list] = None,
+        secret_arn: Optional[str] = "",
+        statement_name: Optional[str] = "",
+        with_event: bool = False,
+        timeout: int = 300,
+        poll_interval: int = 10,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.aws_conn_id = aws_conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.cluster_identifier = cluster_identifier
+        self.database = database
+        self.db_user = db_user
+        self.secret_arn = secret_arn
+        self.statement_name = statement_name
+        self.with_event = with_event
+        self.timeout = timeout
+        self.statement_id = None
+        if poll_interval > 0:
+            self.poll_interval = poll_interval
+        else:
+            self.log.warning(
+                "Invalid poll_interval:",
+                poll_interval,
+            )
+
+    @cached_property
+    def hook(self) -> RedshiftDataHook:
+        """Create and return an RedshiftDataHook."""
+        return RedshiftDataHook(aws_conn_id=self.aws_conn_id)
+
+    def execute_query(self):
+        resp = self.hook.execute_statement(
+            cluster_identifier=self.cluster_identifier,
+            database=self.database,
+            db_user=self.db_user,
+            sql=self.sql,
+            parameters=self.parameters,
+            secret_arn=self.secret_arn,
+            statement_name=self.statement_name,
+            with_event=self.with_event,
+        )
+        return resp['Id']
+
+    def wait_for_results(self, statement_id):
+        elapsed = 0
+        while elapsed < self.timeout:

Review comment:
       can lean on execution_timeout
   ```suggestion
           while True:
   ```

##########
File path: airflow/providers/amazon/aws/operators/redshift_data.py
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from time import sleep
+from typing import Optional
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
+
+
+class RedshiftDataOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster using Redshift Data
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftDataOperator`
+
+    :param cluster_identifier: unique identifier of a cluster
+    :param database: the name of the database
+    :param sql: the SQL statement text to run
+    :param db_user: the database user name
+    :param parameters: the parameters for the SQL statement
+    :param secret_arn: the name or ARN of the secret that enables db access
+    :param statement_name: the name of the SQL statement
+    :param with_event: indicates whether to send an event to EventBridge
+    :param timeout: how long in seconds to wait for a response, if 0 don't wait
+    :param poll_interval: how often in seconds to check the query status
+    """
+
+    template_fields = (
+        'cluster_identifier',
+        'database',
+        'sql',
+        'db_user',
+        'parameters',
+    )
+    template_ext = ('.sql',)
+    template_fields_renderers = {'sql': 'sql'}
+
+    def __init__(
+        self,
+        database: str,
+        sql: str,
+        cluster_identifier: Optional[str] = "",
+        db_user: Optional[str] = "",
+        parameters: Optional[list] = None,
+        secret_arn: Optional[str] = "",
+        statement_name: Optional[str] = "",
+        with_event: bool = False,
+        timeout: int = 300,
+        poll_interval: int = 10,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.aws_conn_id = aws_conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.cluster_identifier = cluster_identifier
+        self.database = database
+        self.db_user = db_user
+        self.secret_arn = secret_arn
+        self.statement_name = statement_name
+        self.with_event = with_event
+        self.timeout = timeout
+        self.statement_id = None
+        if poll_interval > 0:
+            self.poll_interval = poll_interval
+        else:
+            self.log.warning(
+                "Invalid poll_interval:",
+                poll_interval,
+            )
+
+    @cached_property
+    def hook(self) -> RedshiftDataHook:
+        """Create and return an RedshiftDataHook."""
+        return RedshiftDataHook(aws_conn_id=self.aws_conn_id)
+
+    def execute_query(self):
+        resp = self.hook.execute_statement(
+            cluster_identifier=self.cluster_identifier,
+            database=self.database,
+            db_user=self.db_user,
+            sql=self.sql,
+            parameters=self.parameters,
+            secret_arn=self.secret_arn,
+            statement_name=self.statement_name,
+            with_event=self.with_event,
+        )
+        return resp['Id']
+
+    def wait_for_results(self, statement_id):
+        elapsed = 0
+        while elapsed < self.timeout:
+            self.log.info("Polling", statement_id)
+            resp = self.hook.describe_statement(
+                statement_id=statement_id,
+            )
+            status = resp['Status']
+            if status == 'FINISHED':
+                return status
+            elif status == 'FAILED':
+                raise ValueError(f"RedshiftDataHook.describe_statement {status}")
+            elif status == 'ABORTED':
+                raise ValueError(f"Query {status}")

Review comment:
       I would include the statement id in the message

##########
File path: airflow/providers/amazon/aws/operators/redshift_data.py
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from time import sleep
+from typing import Optional
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
+
+
+class RedshiftDataOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster using Redshift Data
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftDataOperator`
+
+    :param cluster_identifier: unique identifier of a cluster
+    :param database: the name of the database
+    :param sql: the SQL statement text to run
+    :param db_user: the database user name
+    :param parameters: the parameters for the SQL statement
+    :param secret_arn: the name or ARN of the secret that enables db access
+    :param statement_name: the name of the SQL statement
+    :param with_event: indicates whether to send an event to EventBridge
+    :param timeout: how long in seconds to wait for a response, if 0 don't wait

Review comment:
       very much not obvious that `timeout` means, also, 'sync'
   
   i would make this boolean (since there's already a timout out in base op you can chop this), and call it `nowait` or something

##########
File path: airflow/providers/amazon/aws/operators/redshift_data.py
##########
@@ -0,0 +1,151 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from time import sleep
+from typing import Optional
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
+
+
+class RedshiftDataOperator(BaseOperator):
+    """
+    Executes SQL Statements against an Amazon Redshift cluster using Redshift Data
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftDataOperator`
+
+    :param cluster_identifier: unique identifier of a cluster
+    :param database: the name of the database
+    :param sql: the SQL statement text to run
+    :param db_user: the database user name
+    :param parameters: the parameters for the SQL statement
+    :param secret_arn: the name or ARN of the secret that enables db access
+    :param statement_name: the name of the SQL statement
+    :param with_event: indicates whether to send an event to EventBridge
+    :param timeout: how long in seconds to wait for a response, if 0 don't wait
+    :param poll_interval: how often in seconds to check the query status
+    """
+
+    template_fields = (
+        'cluster_identifier',
+        'database',
+        'sql',
+        'db_user',
+        'parameters',
+    )
+    template_ext = ('.sql',)
+    template_fields_renderers = {'sql': 'sql'}
+
+    def __init__(
+        self,
+        database: str,
+        sql: str,
+        cluster_identifier: Optional[str] = "",
+        db_user: Optional[str] = "",
+        parameters: Optional[list] = None,
+        secret_arn: Optional[str] = "",
+        statement_name: Optional[str] = "",
+        with_event: bool = False,
+        timeout: int = 300,
+        poll_interval: int = 10,
+        aws_conn_id: str = 'aws_default',
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.aws_conn_id = aws_conn_id
+        self.sql = sql
+        self.parameters = parameters
+        self.cluster_identifier = cluster_identifier
+        self.database = database
+        self.db_user = db_user
+        self.secret_arn = secret_arn
+        self.statement_name = statement_name
+        self.with_event = with_event
+        self.timeout = timeout
+        self.statement_id = None
+        if poll_interval > 0:
+            self.poll_interval = poll_interval
+        else:
+            self.log.warning(
+                "Invalid poll_interval:",
+                poll_interval,
+            )
+
+    @cached_property
+    def hook(self) -> RedshiftDataHook:
+        """Create and return an RedshiftDataHook."""
+        return RedshiftDataHook(aws_conn_id=self.aws_conn_id)
+
+    def execute_query(self):
+        resp = self.hook.execute_statement(
+            cluster_identifier=self.cluster_identifier,
+            database=self.database,
+            db_user=self.db_user,
+            sql=self.sql,
+            parameters=self.parameters,
+            secret_arn=self.secret_arn,
+            statement_name=self.statement_name,
+            with_event=self.with_event,
+        )
+        return resp['Id']
+
+    def wait_for_results(self, statement_id):
+        elapsed = 0
+        while elapsed < self.timeout:
+            self.log.info("Polling", statement_id)
+            resp = self.hook.describe_statement(
+                statement_id=statement_id,
+            )
+            status = resp['Status']
+            if status == 'FINISHED':
+                return status
+            elif status == 'FAILED':
+                raise ValueError(f"RedshiftDataHook.describe_statement {status}")
+            elif status == 'ABORTED':
+                raise ValueError(f"Query {status}")
+            else:
+                self.log.info(f"Query {status}")
+            sleep(self.poll_interval)
+            elapsed = elapsed + self.poll_interval
+
+        raise AirflowException("Timeout. The operation could not be completed within the allotted time.")
+
+    def execute(self, context: dict) -> None:
+        """Execute a statement against Amazon Redshift"""
+        self.log.info(f"Executing statement: {self.sql}")
+
+        self.statement_id = self.execute_query()
+
+        if self.timeout > 0:

Review comment:
       ```suggestion
           if self.nowait is False:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org