You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/21 20:45:50 UTC

[GitHub] [airflow] pankajastro opened a new pull request, #25857: Add redshift create cluster snapshot operator

pankajastro opened a new pull request, #25857:
URL: https://github.com/apache/airflow/pull/25857

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952890439


##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -99,6 +103,32 @@ def test_create_multi_node_cluster(self, mock_get_conn):
         )
 
 
+class TestRedshiftCreateClusterSnapshotOperator:
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
+    def test_create_cluster_snapshot_is_called_when_cluster_is_available(
+        self, mock_get_conn, mock_cluster_status
+    ):
+        mock_cluster_status.return_value = "available"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        create_snapshot.execute(None)
+        mock_get_conn.return_value.create_cluster_snapshot.assert_called_once_with(
+            ClusterIdentifier='test_cluster',
+            SnapshotIdentifier="test_snapshot",
+        )
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    def test_raise_exception_when_cluster_is_not_available(self, mock_cluster_status):
+        mock_cluster_status.return_value = "paused"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        with pytest.raises(AirflowException):
+            create_snapshot.execute(None)
+

Review Comment:
   Add test to check wait not get called  (f1cfd5a07bdb1ab27afd50f232173bd143923333 ) get called (0528e9d0b867d8c6897db1403ec7f523e60cead7)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952890439


##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -99,6 +103,32 @@ def test_create_multi_node_cluster(self, mock_get_conn):
         )
 
 
+class TestRedshiftCreateClusterSnapshotOperator:
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
+    def test_create_cluster_snapshot_is_called_when_cluster_is_available(
+        self, mock_get_conn, mock_cluster_status
+    ):
+        mock_cluster_status.return_value = "available"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        create_snapshot.execute(None)
+        mock_get_conn.return_value.create_cluster_snapshot.assert_called_once_with(
+            ClusterIdentifier='test_cluster',
+            SnapshotIdentifier="test_snapshot",
+        )
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    def test_raise_exception_when_cluster_is_not_available(self, mock_cluster_status):
+        mock_cluster_status.return_value = "paused"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        with pytest.raises(AirflowException):
+            create_snapshot.execute(None)
+

Review Comment:
   Added test to check wait not get called  (f1cfd5a07bdb1ab27afd50f232173bd143923333 ) get called (0528e9d0b867d8c6897db1403ec7f523e60cead7)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
pankajastro commented on PR #25857:
URL: https://github.com/apache/airflow/pull/25857#issuecomment-1221891937

   > Hi @pankajastro. I think this is a good new Operator. But I think a new functionality could be added without the many separate operators. For example, in RDS I added the parameter `db_type` which could be either `'instance'` or `'cluster'`. It gives the possibility to use a single operator for a very similar job. It will be good even if you will add a kind of copy-paste from these PRs:
   > 
   > * [Added Hook for Amazon RDS #20642](https://github.com/apache/airflow/pull/20642)
   > * [Added AWS RDS operators #20907](https://github.com/apache/airflow/pull/20907)
   > * [Added AWS RDS sensors #21231](https://github.com/apache/airflow/pull/21231)
   
   hmm, I liked the Idea but for Redshift, we already have operators like RedshiftPauseClusterOperator, RedshiftResumeClusterOperator etc. I want to add `RedshiftCreateClusterSnapshotOperator` and `RedshiftDeleteClusterSnapshotOperator` I can consolidate these two into one operator `RedshiftClusterSnapshotOperator`, or do you have some thoughts on which would be the potential operator where I can add the this operator? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952742586


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -19,6 +19,7 @@
 
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
+from build.lib.airflow.exceptions import AirflowException

Review Comment:
   ```suggestion
   from airflow.exceptions import AirflowException
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952891171


##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -128,6 +132,23 @@ def test_raise_exception_when_cluster_is_not_available(self, mock_cluster_status
         with pytest.raises(AirflowException):
             create_snapshot.execute(None)
 
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
+    def test_create_cluster_snapshot_with_no_wait(self, mock_get_conn, mock_cluster_status):

Review Comment:
   ```suggestion
       def test_create_cluster_snapshot_with_wait(self, mock_get_conn, mock_cluster_status):
   ```



##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -111,12 +111,16 @@ def test_create_cluster_snapshot_is_called_when_cluster_is_available(
     ):
         mock_cluster_status.return_value = "available"
         create_snapshot = RedshiftCreateClusterSnapshotOperator(
-            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+            task_id="test_snapshot",
+            cluster_identifier="test_cluster",
+            snapshot_identifier="test_snapshot",
+            retention_period=1,
         )
         create_snapshot.execute(None)
         mock_get_conn.return_value.create_cluster_snapshot.assert_called_once_with(
             ClusterIdentifier='test_cluster',
             SnapshotIdentifier="test_snapshot",
+            ManualSnapshotRetentionPeriod=1,
         )
 

Review Comment:
   I would check here the waiter is not called since the default value is `False`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952928160


##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -111,12 +111,16 @@ def test_create_cluster_snapshot_is_called_when_cluster_is_available(
     ):
         mock_cluster_status.return_value = "available"
         create_snapshot = RedshiftCreateClusterSnapshotOperator(
-            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+            task_id="test_snapshot",
+            cluster_identifier="test_cluster",
+            snapshot_identifier="test_snapshot",
+            retention_period=1,
         )
         create_snapshot.execute(None)
         mock_get_conn.return_value.create_cluster_snapshot.assert_called_once_with(
             ClusterIdentifier='test_cluster',
             SnapshotIdentifier="test_snapshot",
+            ManualSnapshotRetentionPeriod=1,
         )
 

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
eladkal commented on PR #25857:
URL: https://github.com/apache/airflow/pull/25857#issuecomment-1222805990

   > hmm, I liked the Idea but for Redshift, we already have operators like RedshiftPauseClusterOperator, RedshiftResumeClusterOperator etc. I want to add `RedshiftCreateClusterSnapshotOperator` and `RedshiftDeleteClusterSnapshotOperator` I can consolidate these two into one operator `RedshiftClusterSnapshotOperator`
   
   We can deprecate operators if there is a need.
   If we can generalize and make things simpler to use I think it's better but this is a judgment call per case.
   @ferruzzi @o-nikolas @vincbeck can you share your thoughts on this?
   
   While we are on it can we address also the todo (I'm not sure what was the intent on this):
   https://github.com/apache/airflow/blob/9021c2b97d6c662f7a8b7380f74af5b0739a3f50/airflow/providers/amazon/aws/hooks/redshift_cluster.py#L76-L77
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952759973


##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -99,6 +103,32 @@ def test_create_multi_node_cluster(self, mock_get_conn):
         )
 
 
+class TestRedshiftCreateClusterSnapshotOperator:
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
+    def test_create_cluster_snapshot_is_called_when_cluster_is_available(
+        self, mock_get_conn, mock_cluster_status
+    ):
+        mock_cluster_status.return_value = "available"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        create_snapshot.execute(None)
+        mock_get_conn.return_value.create_cluster_snapshot.assert_called_once_with(
+            ClusterIdentifier='test_cluster',
+            SnapshotIdentifier="test_snapshot",
+        )
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    def test_raise_exception_when_cluster_is_not_available(self, mock_cluster_status):
+        mock_cluster_status.return_value = "paused"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        with pytest.raises(AirflowException):
+            create_snapshot.execute(None)
+

Review Comment:
   Maybe a thirst test when `wait_for_completion` is `False`. You would essentially check the waiter is not called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on PR #25857:
URL: https://github.com/apache/airflow/pull/25857#issuecomment-1222877886

   I rather have several "small" operator than one which handles multiple use cases. I guess there can be some exceptions but here I dont mind having `RedshiftCreateClusterSnapshotOperator` and `RedshiftDeleteClusterSnapshotOperator `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r951885486


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -242,6 +243,63 @@ def execute(self, context: 'Context'):
         self.log.info(cluster)
 
 
+class RedshiftCreateClusterSnapshotOperator(BaseOperator):
+    """
+    Creates a manual snapshot of the specified cluster. The cluster must be in the available state
+
+    :param snapshot_identifier: A unique identifier for the snapshot that you are requesting
+    :param cluster_identifier: The cluster identifier for which you want a snapshot
+    :param retention_period: The number of days that a manual snapshot is retained
+    :param wait_for_completion: Whether wait for cluster to be in ``available`` state
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state
+    """
+
+    def __init__(
+        self,
+        *,
+        snapshot_identifier: str,
+        cluster_identifier: str,
+        retention_period: int = -1,
+        wait_for_completion: bool = False,
+        poll_interval: float = 5.0,
+        aws_conn_id: str = "aws_default",

Review Comment:
   Can you add this parameter to the docstring please?



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -242,6 +243,63 @@ def execute(self, context: 'Context'):
         self.log.info(cluster)
 
 
+class RedshiftCreateClusterSnapshotOperator(BaseOperator):
+    """
+    Creates a manual snapshot of the specified cluster. The cluster must be in the available state
+
+    :param snapshot_identifier: A unique identifier for the snapshot that you are requesting
+    :param cluster_identifier: The cluster identifier for which you want a snapshot
+    :param retention_period: The number of days that a manual snapshot is retained
+    :param wait_for_completion: Whether wait for cluster to be in ``available`` state
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state
+    """
+
+    def __init__(
+        self,
+        *,
+        snapshot_identifier: str,
+        cluster_identifier: str,
+        retention_period: int = -1,

Review Comment:
   It might be useful to make a note about what the default value is here and what -1 means. I assume it means to never delete the snapshot, but better to be explicit than implicit. WDYT?



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -242,6 +243,63 @@ def execute(self, context: 'Context'):
         self.log.info(cluster)
 
 
+class RedshiftCreateClusterSnapshotOperator(BaseOperator):
+    """
+    Creates a manual snapshot of the specified cluster. The cluster must be in the available state
+
+    :param snapshot_identifier: A unique identifier for the snapshot that you are requesting
+    :param cluster_identifier: The cluster identifier for which you want a snapshot
+    :param retention_period: The number of days that a manual snapshot is retained
+    :param wait_for_completion: Whether wait for cluster to be in ``available`` state
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state
+    """
+
+    def __init__(
+        self,
+        *,
+        snapshot_identifier: str,
+        cluster_identifier: str,
+        retention_period: int = -1,
+        wait_for_completion: bool = False,
+        poll_interval: float = 5.0,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.snapshot_identifier = snapshot_identifier
+        self.cluster_identifier = cluster_identifier
+        self.retention_period = retention_period
+        self.wait_for_completion = wait_for_completion
+        self.poll_interval = poll_interval
+        self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id)
+
+    def execute(self, context: Context) -> Any:

Review Comment:
   ```suggestion
       def execute(self, context: "Context") -> Any:
   ```



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -242,6 +243,63 @@ def execute(self, context: 'Context'):
         self.log.info(cluster)
 
 
+class RedshiftCreateClusterSnapshotOperator(BaseOperator):
+    """
+    Creates a manual snapshot of the specified cluster. The cluster must be in the available state
+
+    :param snapshot_identifier: A unique identifier for the snapshot that you are requesting
+    :param cluster_identifier: The cluster identifier for which you want a snapshot
+    :param retention_period: The number of days that a manual snapshot is retained
+    :param wait_for_completion: Whether wait for cluster to be in ``available`` state
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state
+    """
+
+    def __init__(
+        self,
+        *,
+        snapshot_identifier: str,
+        cluster_identifier: str,
+        retention_period: int = -1,
+        wait_for_completion: bool = False,
+        poll_interval: float = 5.0,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.snapshot_identifier = snapshot_identifier
+        self.cluster_identifier = cluster_identifier
+        self.retention_period = retention_period
+        self.wait_for_completion = wait_for_completion
+        self.poll_interval = poll_interval
+        self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id)
+
+    def execute(self, context: Context) -> Any:
+        cluster_state = self.redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state != "available":
+            raise AirflowException(
+                f"Redshift cluster must be in available state."

Review Comment:
   ```suggestion
                   "Redshift cluster must be in available state. "
   ```
   Nit. The f-string is not _technically_ needed and just adding a space between the concatenated lines.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on PR #25857:
URL: https://github.com/apache/airflow/pull/25857#issuecomment-1221629241

   Hi @pankajastro.
   I think this is a good new Operator. But I think a new functionality could be added without the many separate operators.
   For example, in RDS I added the parameter `db_type` which could be either `'instance'` or `'cluster'`. It gives the possibility to use a single operator for a very similar job. 
   It will be good even if you will add a kind of copy-paste from these PRs:
   - https://github.com/apache/airflow/pull/20642
   - https://github.com/apache/airflow/pull/20907
   - https://github.com/apache/airflow/pull/21231
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952744804


##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -17,12 +17,16 @@
 
 from unittest import mock
 
+import pytest
+
 from airflow.providers.amazon.aws.operators.redshift_cluster import (
     RedshiftCreateClusterOperator,
+    RedshiftCreateClusterSnapshotOperator,
     RedshiftDeleteClusterOperator,
     RedshiftPauseClusterOperator,
     RedshiftResumeClusterOperator,
 )
+from build.lib.airflow.exceptions import AirflowException

Review Comment:
   ```suggestion
   from airflow.exceptions import AirflowException
   ```



##########
docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst:
##########
@@ -74,6 +74,20 @@ To pause an 'available' Amazon Redshift cluster you can use
     :start-after: [START howto_operator_redshift_pause_cluster]
     :end-before: [END howto_operator_redshift_pause_cluster]
 
+.. _howto/operator:RedshiftCreateClusterSnapshotOperator:
+
+Create an Amazon Redshift cluster snapshot
+==========================================
+
+To create Amazon Redshift cluster snapshot you can use
+:class:`RedshiftDeleteClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`

Review Comment:
   Actually, you are talking about the create snapshot not creating the cluster. Just needs an update accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952744030


##########
docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst:
##########
@@ -74,6 +74,20 @@ To pause an 'available' Amazon Redshift cluster you can use
     :start-after: [START howto_operator_redshift_pause_cluster]
     :end-before: [END howto_operator_redshift_pause_cluster]
 
+.. _howto/operator:RedshiftCreateClusterSnapshotOperator:
+
+Create an Amazon Redshift cluster snapshot
+==========================================
+
+To create Amazon Redshift cluster snapshot you can use
+:class:`RedshiftDeleteClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`

Review Comment:
   ```suggestion
   :class:`RedshiftCreateClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952751168


##########
docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst:
##########
@@ -74,6 +74,20 @@ To pause an 'available' Amazon Redshift cluster you can use
     :start-after: [START howto_operator_redshift_pause_cluster]
     :end-before: [END howto_operator_redshift_pause_cluster]
 
+.. _howto/operator:RedshiftCreateClusterSnapshotOperator:
+
+Create an Amazon Redshift cluster snapshot
+==========================================
+
+To create Amazon Redshift cluster snapshot you can use
+:class:`RedshiftDeleteClusterOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`

Review Comment:
   right, fixed here f55f4123298ebe4938e36694f4303641d3e408a0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #25857:
URL: https://github.com/apache/airflow/pull/25857


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952759973


##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -99,6 +103,32 @@ def test_create_multi_node_cluster(self, mock_get_conn):
         )
 
 
+class TestRedshiftCreateClusterSnapshotOperator:
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
+    def test_create_cluster_snapshot_is_called_when_cluster_is_available(
+        self, mock_get_conn, mock_cluster_status
+    ):
+        mock_cluster_status.return_value = "available"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        create_snapshot.execute(None)
+        mock_get_conn.return_value.create_cluster_snapshot.assert_called_once_with(
+            ClusterIdentifier='test_cluster',
+            SnapshotIdentifier="test_snapshot",
+        )
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    def test_raise_exception_when_cluster_is_not_available(self, mock_cluster_status):
+        mock_cluster_status.return_value = "paused"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        with pytest.raises(AirflowException):
+            create_snapshot.execute(None)
+

Review Comment:
   Maybe a thirst test when `wait_for_completion` is `False`. You would essentially check he the waiter is not called



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -242,6 +243,66 @@ def execute(self, context: 'Context'):
         self.log.info(cluster)
 
 
+class RedshiftCreateClusterSnapshotOperator(BaseOperator):
+    """
+    Creates a manual snapshot of the specified cluster. The cluster must be in the available state
+

Review Comment:
   ```suggestion
   
       .. seealso::
           For more information on how to use this operator, take a look at the guide:
           :ref:`howto/operator:RedshiftCreateClusterSnapshotOperator`
           
   ```



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -242,6 +243,66 @@ def execute(self, context: 'Context'):
         self.log.info(cluster)
 
 
+class RedshiftCreateClusterSnapshotOperator(BaseOperator):
+    """
+    Creates a manual snapshot of the specified cluster. The cluster must be in the available state
+
+    :param snapshot_identifier: A unique identifier for the snapshot that you are requesting
+    :param cluster_identifier: The cluster identifier for which you want a snapshot
+    :param retention_period: The number of days that a manual snapshot is retained.
+        If the value is -1, the manual snapshot is retained indefinitely.
+    :param wait_for_completion: Whether wait for cluster to be in ``available`` state
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        The default connection id is ``aws_default``
+    """
+
+    def __init__(
+        self,
+        *,
+        snapshot_identifier: str,
+        cluster_identifier: str,
+        retention_period: int = -1,
+        wait_for_completion: bool = False,
+        poll_interval: float = 5.0,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.snapshot_identifier = snapshot_identifier
+        self.cluster_identifier = cluster_identifier
+        self.retention_period = retention_period
+        self.wait_for_completion = wait_for_completion
+        self.poll_interval = poll_interval
+        self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id)
+
+    def execute(self, context: "Context") -> Any:
+        cluster_state = self.redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state != "available":
+            raise AirflowException(
+                "Redshift cluster must be in available state. "
+                f"Redshift cluster current state is {cluster_state}"
+            )
+
+        self.redshift_hook.create_cluster_snapshot(
+            cluster_identifier=self.cluster_identifier,
+            snapshot_identifier=self.snapshot_identifier,
+            retention_period=self.retention_period,
+        )
+
+        if self.wait_for_completion:
+            cluster_status: str = self.check_status()

Review Comment:
   You might want to use a waiter, that would make the code way easier/cleaner. See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Waiter.SnapshotAvailable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pankajastro commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
pankajastro commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952888426


##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -242,6 +243,66 @@ def execute(self, context: 'Context'):
         self.log.info(cluster)
 
 
+class RedshiftCreateClusterSnapshotOperator(BaseOperator):
+    """
+    Creates a manual snapshot of the specified cluster. The cluster must be in the available state
+
+    :param snapshot_identifier: A unique identifier for the snapshot that you are requesting
+    :param cluster_identifier: The cluster identifier for which you want a snapshot
+    :param retention_period: The number of days that a manual snapshot is retained.
+        If the value is -1, the manual snapshot is retained indefinitely.
+    :param wait_for_completion: Whether wait for cluster to be in ``available`` state
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        The default connection id is ``aws_default``
+    """
+
+    def __init__(
+        self,
+        *,
+        snapshot_identifier: str,
+        cluster_identifier: str,
+        retention_period: int = -1,
+        wait_for_completion: bool = False,
+        poll_interval: float = 5.0,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.snapshot_identifier = snapshot_identifier
+        self.cluster_identifier = cluster_identifier
+        self.retention_period = retention_period
+        self.wait_for_completion = wait_for_completion
+        self.poll_interval = poll_interval
+        self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id)
+
+    def execute(self, context: "Context") -> Any:
+        cluster_state = self.redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state != "available":
+            raise AirflowException(
+                "Redshift cluster must be in available state. "
+                f"Redshift cluster current state is {cluster_state}"
+            )
+
+        self.redshift_hook.create_cluster_snapshot(
+            cluster_identifier=self.cluster_identifier,
+            snapshot_identifier=self.snapshot_identifier,
+            retention_period=self.retention_period,
+        )
+
+        if self.wait_for_completion:
+            cluster_status: str = self.check_status()

Review Comment:
   make sense. see 0528e9d0b867d8c6897db1403ec7f523e60cead7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952759973


##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -99,6 +103,32 @@ def test_create_multi_node_cluster(self, mock_get_conn):
         )
 
 
+class TestRedshiftCreateClusterSnapshotOperator:
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
+    def test_create_cluster_snapshot_is_called_when_cluster_is_available(
+        self, mock_get_conn, mock_cluster_status
+    ):
+        mock_cluster_status.return_value = "available"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        create_snapshot.execute(None)
+        mock_get_conn.return_value.create_cluster_snapshot.assert_called_once_with(
+            ClusterIdentifier='test_cluster',
+            SnapshotIdentifier="test_snapshot",
+        )
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    def test_raise_exception_when_cluster_is_not_available(self, mock_cluster_status):
+        mock_cluster_status.return_value = "paused"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        with pytest.raises(AirflowException):
+            create_snapshot.execute(None)
+

Review Comment:
   Maybe a third test when `wait_for_completion` is `False`. You would essentially check the waiter is not called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org