You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/23 15:15:45 UTC

[GitHub] [airflow] vincbeck commented on a diff in pull request #25857: Add redshift create cluster snapshot operator

vincbeck commented on code in PR #25857:
URL: https://github.com/apache/airflow/pull/25857#discussion_r952759973


##########
tests/providers/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -99,6 +103,32 @@ def test_create_multi_node_cluster(self, mock_get_conn):
         )
 
 
+class TestRedshiftCreateClusterSnapshotOperator:
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
+    def test_create_cluster_snapshot_is_called_when_cluster_is_available(
+        self, mock_get_conn, mock_cluster_status
+    ):
+        mock_cluster_status.return_value = "available"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        create_snapshot.execute(None)
+        mock_get_conn.return_value.create_cluster_snapshot.assert_called_once_with(
+            ClusterIdentifier='test_cluster',
+            SnapshotIdentifier="test_snapshot",
+        )
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+    def test_raise_exception_when_cluster_is_not_available(self, mock_cluster_status):
+        mock_cluster_status.return_value = "paused"
+        create_snapshot = RedshiftCreateClusterSnapshotOperator(
+            task_id="test_snapshot", cluster_identifier="test_cluster", snapshot_identifier="test_snapshot"
+        )
+        with pytest.raises(AirflowException):
+            create_snapshot.execute(None)
+

Review Comment:
   Maybe a thirst test when `wait_for_completion` is `False`. You would essentially check he the waiter is not called



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -242,6 +243,66 @@ def execute(self, context: 'Context'):
         self.log.info(cluster)
 
 
+class RedshiftCreateClusterSnapshotOperator(BaseOperator):
+    """
+    Creates a manual snapshot of the specified cluster. The cluster must be in the available state
+

Review Comment:
   ```suggestion
   
       .. seealso::
           For more information on how to use this operator, take a look at the guide:
           :ref:`howto/operator:RedshiftCreateClusterSnapshotOperator`
           
   ```



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -242,6 +243,66 @@ def execute(self, context: 'Context'):
         self.log.info(cluster)
 
 
+class RedshiftCreateClusterSnapshotOperator(BaseOperator):
+    """
+    Creates a manual snapshot of the specified cluster. The cluster must be in the available state
+
+    :param snapshot_identifier: A unique identifier for the snapshot that you are requesting
+    :param cluster_identifier: The cluster identifier for which you want a snapshot
+    :param retention_period: The number of days that a manual snapshot is retained.
+        If the value is -1, the manual snapshot is retained indefinitely.
+    :param wait_for_completion: Whether wait for cluster to be in ``available`` state
+    :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        The default connection id is ``aws_default``
+    """
+
+    def __init__(
+        self,
+        *,
+        snapshot_identifier: str,
+        cluster_identifier: str,
+        retention_period: int = -1,
+        wait_for_completion: bool = False,
+        poll_interval: float = 5.0,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.snapshot_identifier = snapshot_identifier
+        self.cluster_identifier = cluster_identifier
+        self.retention_period = retention_period
+        self.wait_for_completion = wait_for_completion
+        self.poll_interval = poll_interval
+        self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id)
+
+    def execute(self, context: "Context") -> Any:
+        cluster_state = self.redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state != "available":
+            raise AirflowException(
+                "Redshift cluster must be in available state. "
+                f"Redshift cluster current state is {cluster_state}"
+            )
+
+        self.redshift_hook.create_cluster_snapshot(
+            cluster_identifier=self.cluster_identifier,
+            snapshot_identifier=self.snapshot_identifier,
+            retention_period=self.retention_period,
+        )
+
+        if self.wait_for_completion:
+            cluster_status: str = self.check_status()

Review Comment:
   You might want to use a waiter, that would make the code way easier/cleaner. See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Waiter.SnapshotAvailable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org