You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/02/22 20:19:54 UTC

[airflow] branch main updated: Add `wait_for_completion` param in `RedshiftCreateClusterOperator` (#29657)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ba2d562cfb Add `wait_for_completion` param in `RedshiftCreateClusterOperator` (#29657)
ba2d562cfb is described below

commit ba2d562cfb36c5b9b845251f991c3d5bfa17db4f
Author: Pankaj Singh <98...@users.noreply.github.com>
AuthorDate: Thu Feb 23 01:49:40 2023 +0530

    Add `wait_for_completion` param in `RedshiftCreateClusterOperator` (#29657)
---
 .../providers/amazon/aws/operators/redshift_cluster.py | 18 ++++++++++++++++++
 .../amazon/aws/operators/test_redshift_cluster.py      |  9 +++++++++
 2 files changed, 27 insertions(+)

diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py
index f209581c63..0771d67a37 100644
--- a/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -84,6 +84,9 @@ class RedshiftCreateClusterOperator(BaseOperator):
     :param default_iam_role_arn: ARN for the IAM role.
     :param aws_conn_id: str = The Airflow connection used for AWS credentials.
         The default connection id is ``aws_default``.
+    :param wait_for_completion: Whether wait for the cluster to be in ``available`` state
+    :param max_attempt: The maximum number of attempts to be made. Default: 5
+    :param poll_interval: The amount of time in seconds to wait between attempts. Default: 60
     """
 
     template_fields: Sequence[str] = (
@@ -133,6 +136,9 @@ class RedshiftCreateClusterOperator(BaseOperator):
         aqua_configuration_status: str | None = None,
         default_iam_role_arn: str | None = None,
         aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = False,
+        max_attempt: int = 5,
+        poll_interval: int = 60,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -170,6 +176,9 @@ class RedshiftCreateClusterOperator(BaseOperator):
         self.aqua_configuration_status = aqua_configuration_status
         self.default_iam_role_arn = default_iam_role_arn
         self.aws_conn_id = aws_conn_id
+        self.wait_for_completion = wait_for_completion
+        self.max_attempt = max_attempt
+        self.poll_interval = poll_interval
         self.kwargs = kwargs
 
     def execute(self, context: Context):
@@ -242,6 +251,15 @@ class RedshiftCreateClusterOperator(BaseOperator):
             self.master_user_password,
             params,
         )
+        if self.wait_for_completion:
+            redshift_hook.get_conn().get_waiter("cluster_available").wait(
+                ClusterIdentifier=self.cluster_identifier,
+                WaiterConfig={
+                    "Delay": self.poll_interval,
+                    "MaxAttempts": self.max_attempt,
+                },
+            )
+
         self.log.info("Created Redshift cluster %s", self.cluster_identifier)
         self.log.info(cluster)
 
diff --git a/tests/providers/amazon/aws/operators/test_redshift_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
index 35ad83cd3f..71e73ea802 100644
--- a/tests/providers/amazon/aws/operators/test_redshift_cluster.py
+++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
@@ -57,6 +57,7 @@ class TestRedshiftCreateClusterOperator:
             master_username="adminuser",
             master_user_password="Test123$",
             cluster_type="single-node",
+            wait_for_completion=True,
         )
         redshift_operator.execute(None)
         params = {
@@ -76,6 +77,11 @@ class TestRedshiftCreateClusterOperator:
             **params,
         )
 
+        # wait_for_completion is True so check waiter is called
+        mock_get_conn.return_value.get_waiter.return_value.wait.assert_called_once_with(
+            ClusterIdentifier="test-cluster", WaiterConfig={"Delay": 60, "MaxAttempts": 5}
+        )
+
     @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
     def test_create_multi_node_cluster(self, mock_get_conn):
         redshift_operator = RedshiftCreateClusterOperator(
@@ -106,6 +112,9 @@ class TestRedshiftCreateClusterOperator:
             **params,
         )
 
+        # wait_for_completion is False so check waiter is not called
+        mock_get_conn.return_value.get_waiter.assert_not_called()
+
 
 class TestRedshiftCreateClusterSnapshotOperator:
     @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")