You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/02/22 20:19:54 UTC
[airflow] branch main updated: Add `wait_for_completion` param in `RedshiftCreateClusterOperator` (#29657)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ba2d562cfb Add `wait_for_completion` param in `RedshiftCreateClusterOperator` (#29657)
ba2d562cfb is described below
commit ba2d562cfb36c5b9b845251f991c3d5bfa17db4f
Author: Pankaj Singh <98...@users.noreply.github.com>
AuthorDate: Thu Feb 23 01:49:40 2023 +0530
Add `wait_for_completion` param in `RedshiftCreateClusterOperator` (#29657)
---
.../providers/amazon/aws/operators/redshift_cluster.py | 18 ++++++++++++++++++
.../amazon/aws/operators/test_redshift_cluster.py | 9 +++++++++
2 files changed, 27 insertions(+)
diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py
index f209581c63..0771d67a37 100644
--- a/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -84,6 +84,9 @@ class RedshiftCreateClusterOperator(BaseOperator):
:param default_iam_role_arn: ARN for the IAM role.
:param aws_conn_id: str = The Airflow connection used for AWS credentials.
The default connection id is ``aws_default``.
+ :param wait_for_completion: Whether wait for the cluster to be in ``available`` state
+ :param max_attempt: The maximum number of attempts to be made. Default: 5
+ :param poll_interval: The amount of time in seconds to wait between attempts. Default: 60
"""
template_fields: Sequence[str] = (
@@ -133,6 +136,9 @@ class RedshiftCreateClusterOperator(BaseOperator):
aqua_configuration_status: str | None = None,
default_iam_role_arn: str | None = None,
aws_conn_id: str = "aws_default",
+ wait_for_completion: bool = False,
+ max_attempt: int = 5,
+ poll_interval: int = 60,
**kwargs,
):
super().__init__(**kwargs)
@@ -170,6 +176,9 @@ class RedshiftCreateClusterOperator(BaseOperator):
self.aqua_configuration_status = aqua_configuration_status
self.default_iam_role_arn = default_iam_role_arn
self.aws_conn_id = aws_conn_id
+ self.wait_for_completion = wait_for_completion
+ self.max_attempt = max_attempt
+ self.poll_interval = poll_interval
self.kwargs = kwargs
def execute(self, context: Context):
@@ -242,6 +251,15 @@ class RedshiftCreateClusterOperator(BaseOperator):
self.master_user_password,
params,
)
+ if self.wait_for_completion:
+ redshift_hook.get_conn().get_waiter("cluster_available").wait(
+ ClusterIdentifier=self.cluster_identifier,
+ WaiterConfig={
+ "Delay": self.poll_interval,
+ "MaxAttempts": self.max_attempt,
+ },
+ )
+
self.log.info("Created Redshift cluster %s", self.cluster_identifier)
self.log.info(cluster)
diff --git a/tests/providers/amazon/aws/operators/test_redshift_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
index 35ad83cd3f..71e73ea802 100644
--- a/tests/providers/amazon/aws/operators/test_redshift_cluster.py
+++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py
@@ -57,6 +57,7 @@ class TestRedshiftCreateClusterOperator:
master_username="adminuser",
master_user_password="Test123$",
cluster_type="single-node",
+ wait_for_completion=True,
)
redshift_operator.execute(None)
params = {
@@ -76,6 +77,11 @@ class TestRedshiftCreateClusterOperator:
**params,
)
+ # wait_for_completion is True so check waiter is called
+ mock_get_conn.return_value.get_waiter.return_value.wait.assert_called_once_with(
+ ClusterIdentifier="test-cluster", WaiterConfig={"Delay": 60, "MaxAttempts": 5}
+ )
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
def test_create_multi_node_cluster(self, mock_get_conn):
redshift_operator = RedshiftCreateClusterOperator(
@@ -106,6 +112,9 @@ class TestRedshiftCreateClusterOperator:
**params,
)
+ # wait_for_completion is False so check waiter is not called
+ mock_get_conn.return_value.get_waiter.assert_not_called()
+
class TestRedshiftCreateClusterSnapshotOperator:
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")