You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/30 21:17:59 UTC
[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759631075
##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+ """
+ Pause an AWS Redshift Cluster using boto3.
Review comment:
```suggestion
Pause an AWS Redshift Cluster if it has status `available`.
```
##########
File path: tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator
+
+try:
+ from moto import mock_redshift
Review comment:
it does not appear that you use this anywhere 🤔
##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -37,18 +37,25 @@ def __init__(
self,
*,
cluster_identifier: str,
- target_status: str = 'available',
+ target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
aws_conn_id: str = 'aws_default',
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
- self.target_status = target_status
+ self.target_status = (
+ target_status
+ if isinstance(target_status, RedshiftClusterStates)
+ else RedshiftClusterStates(str(target_status))
+ )
+
self.aws_conn_id = aws_conn_id
self.hook: Optional[RedshiftHook] = None
def poke(self, context):
- self.log.info('Poking for status : %s\nfor cluster %s', self.target_status, self.cluster_identifier)
+ self.log.info(
+ 'Poking for status : %s\nfor cluster %s', self.target_status.value, self.cluster_identifier
Review comment:
```suggestion
'Checking cluster %r for status %r', self.cluster_identifier, self.target_status.value
```
or
```suggestion
'Checking cluster %r\nwaiting for status %r', self.cluster_identifier, self.target_status.value
```
because `for cluster %s` is not really a grammatical expression by itself and it's on a newline so it seems that it should be.
##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+ """
+ Pause an AWS Redshift Cluster using boto3.
+
+ :param cluster_identifier: id of the AWS Redshift Cluster
+ :type cluster_identifier: str
+ :param aws_conn_id: aws connection to use
+ :type aws_conn_id: str
+ :param check_interval: time in seconds that the job should wait in
+ between each instance state checks until operation is completed
+ :type check_interval: float
+ """
+
+ template_fields = ("cluster_identifier",)
+ ui_color = "#eeaa11"
+ ui_fgcolor = "#ffffff"
+
+ def __init__(
+ self,
+ *,
+ cluster_identifier: str,
+ aws_conn_id: str = "aws_default",
+ check_interval: float = 15,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.cluster_identifier = cluster_identifier
+ self.aws_conn_id = aws_conn_id
+ self.check_interval = check_interval
+
+ def execute(self, context):
+ redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+ self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
+ cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+ if cluster_state == RedshiftClusterStates.AVAILABLE:
Review comment:
if not met i would log info, or perhaps even warning because it seems that it is unexpected.
##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+ """
+ Resume an AWS Redshift Cluster using boto3.
+
+ :param cluster_identifier: id of the AWS Redshift Cluster
+ :type cluster_identifier: str
+ :param aws_conn_id: aws connection to use
+ :type aws_conn_id: str
+ :param check_interval: time in seconds that the job should wait in
+ between each instance state checks until operation is completed
+ :type check_interval: float
+ """
+
+ template_fields = ("cluster_identifier",)
+ ui_color = "#eeaa11"
+ ui_fgcolor = "#ffffff"
+
+ def __init__(
+ self,
+ *,
+ cluster_identifier: str,
+ aws_conn_id: str = "aws_default",
+ check_interval: float = 15,
Review comment:
it appears nothing is done with this parameter
##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+ """
+ Resume an AWS Redshift Cluster using boto3.
Review comment:
```suggestion
Resume a paused AWS Redshift Cluster
```
##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+ """
+ Resume an AWS Redshift Cluster using boto3.
+
+ :param cluster_identifier: id of the AWS Redshift Cluster
+ :type cluster_identifier: str
+ :param aws_conn_id: aws connection to use
+ :type aws_conn_id: str
+ :param check_interval: time in seconds that the job should wait in
+ between each instance state checks until operation is completed
+ :type check_interval: float
+ """
+
+ template_fields = ("cluster_identifier",)
+ ui_color = "#eeaa11"
+ ui_fgcolor = "#ffffff"
+
+ def __init__(
+ self,
+ *,
+ cluster_identifier: str,
+ aws_conn_id: str = "aws_default",
+ check_interval: float = 15,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.cluster_identifier = cluster_identifier
+ self.aws_conn_id = aws_conn_id
+ self.check_interval = check_interval
+
+ def execute(self, context):
+ redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+ self.log.info("Starting Redshift cluster %s", self.cluster_identifier)
+ cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+ if cluster_state == RedshiftClusterStates.PAUSED:
Review comment:
if this condition is not met i would `log.info`
##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
"""
try:
response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
- return response[0]['ClusterStatus'] if response else None
+ return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None
Review comment:
i'm a little unsure about converting to enum here. You don't control the API so you can't guarantee that you will have the value in your enum. If an unexpected value appears here, you'll get an unexpected failure. I think you can use the enum for evaluating whether the returned value is the one you are looking for, but I don't think it's a good idea to immediately convert it without error handling. I suppose you could catch `ValueError` and return an uncategorized enum value or something.... but probably better to just leave it raw. i bet @uranusjr would have some wisdom to share here.
##########
File path: tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_resume_cluster import RedshiftResumeClusterOperator
+
+try:
+ from moto import mock_redshift
+except ImportError:
+ mock_redshift = None
+
+
+class TestResumeClusterOperator(unittest.TestCase):
+ @staticmethod
+ def _create_clusters():
+ client = boto3.client('redshift', region_name='us-east-1')
+ client.create_cluster(
+ ClusterIdentifier='test_cluster_to_pause',
+ NodeType='dc1.large',
+ MasterUsername='admin',
+ MasterUserPassword='mock_password',
+ )
+ client.create_cluster(
+ ClusterIdentifier='test_cluster_to_resume',
+ NodeType='dc1.large',
+ MasterUsername='admin',
+ MasterUserPassword='mock_password',
+ )
+ if not client.describe_clusters()['Clusters']:
+ raise ValueError('AWS not properly mocked')
+
+ def test_init(self):
Review comment:
again in this one you don't seem to be verifying the behavior
##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -37,18 +37,25 @@ def __init__(
self,
*,
cluster_identifier: str,
- target_status: str = 'available',
+ target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
aws_conn_id: str = 'aws_default',
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
- self.target_status = target_status
+ self.target_status = (
+ target_status
+ if isinstance(target_status, RedshiftClusterStates)
+ else RedshiftClusterStates(str(target_status))
+ )
+
Review comment:
why not simply
```suggestion
self.target_status = RedshiftClusterStates(target_status)
```
it does work even if target_status is the enum
##########
File path: docs/apache-airflow-providers-amazon/operators/redshift.rst
##########
@@ -94,3 +94,26 @@ All together, here is our DAG:
:language: python
:start-after: [START redshift_operator_howto_guide]
:end-before: [END redshift_operator_howto_guide]
+
+
+.. _howto/operator:RedshiftResumeClusterOperator:
+
+Resume a Redshift Cluster
+"""""""""""""""""""""""""""""""""""""""""""
+
+To resume an existing AWS Redshift Cluster you can use
+:class:`~airflow.providers.amazon.aws.operators.redshift_resume_cluster.RedshiftResumeClusterOperator`.
+
+This Operator leverages the AWS CLI
+`resume-cluster <https://docs.aws.amazon.com/cli/latest/reference/redshift/resume-cluster.html>`__ API
+
+.. _howto/operator:RedshiftPauseClusterOperator:
+
+Pause a Redshift Cluster
+"""""""""""""""""""""""""""""""""""""""""""
+
+To pause an existing AWS Redshift Cluster you can use
Review comment:
```suggestion
To pause an 'available' AWS Redshift Cluster you can use
```
##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+ """
+ Pause an AWS Redshift Cluster using boto3.
+
+ :param cluster_identifier: id of the AWS Redshift Cluster
+ :type cluster_identifier: str
+ :param aws_conn_id: aws connection to use
+ :type aws_conn_id: str
+ :param check_interval: time in seconds that the job should wait in
+ between each instance state checks until operation is completed
+ :type check_interval: float
+ """
+
+ template_fields = ("cluster_identifier",)
+ ui_color = "#eeaa11"
+ ui_fgcolor = "#ffffff"
+
+ def __init__(
+ self,
+ *,
+ cluster_identifier: str,
+ aws_conn_id: str = "aws_default",
+ check_interval: float = 15,
Review comment:
unused?
##########
File path: tests/providers/amazon/aws/sensors/test_redshift.py
##########
@@ -81,7 +96,7 @@ def test_poke_cluster_not_found(self):
timeout=5,
aws_conn_id='aws_default',
cluster_identifier='test_cluster_not_found',
- target_status='cluster_not_found',
+ target_status=RedshiftClusterStates.NONEXISTENT,
)
assert op.poke(None)
Review comment:
```suggestion
assert op.poke(None) is True
```
##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
"""
try:
response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
- return response[0]['ClusterStatus'] if response else None
+ return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None
except self.get_conn().exceptions.ClusterNotFoundFault:
- return 'cluster_not_found'
Review comment:
this is technically a breaking change, converting the return here from this string value to the enum (also with a _different_ value) _(come to think of it, it's breaking for any value, not just for not found)_
@uranusjr since i have your attention on this PR anyway, wondering if you have any thoughts about what we should do here. should we allow this breaking change and do a major release for this provider? or should we say, not worth it and we should keep backward compatibity somehow (e.g. by using `str, Enum` inheritance and using `cluster_not_found` instead of `nonexistent`?
##########
File path: docs/apache-airflow-providers-amazon/operators/redshift.rst
##########
@@ -94,3 +94,26 @@ All together, here is our DAG:
:language: python
:start-after: [START redshift_operator_howto_guide]
:end-before: [END redshift_operator_howto_guide]
+
+
+.. _howto/operator:RedshiftResumeClusterOperator:
+
+Resume a Redshift Cluster
+"""""""""""""""""""""""""""""""""""""""""""
+
+To resume an existing AWS Redshift Cluster you can use
Review comment:
```suggestion
To resume a 'paused' AWS Redshift Cluster you can use
```
##########
File path: tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator
+
+try:
+ from moto import mock_redshift
+except ImportError:
+ mock_redshift = None
+
+
+class TestPauseClusterOperator(unittest.TestCase):
Review comment:
you don't really seem to be testing the behavior of the operator.
i'd want to verify two things. 1. verify that when the cluster is `available`, `pause_cluster` is called 2. when cluster has any other status then it's not called (and perhaps verify that instead a warning is emitted if you decide to add one)
##########
File path: tests/providers/amazon/aws/sensors/test_redshift.py
##########
@@ -56,6 +57,20 @@ def test_poke(self):
)
assert op.poke(None)
+ @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present')
+ @mock_redshift
+ def test_poke_with_cluster_state(self):
+ self._create_cluster()
+ op = AwsRedshiftClusterSensor(
+ task_id='test_cluster_sensor',
+ poke_interval=1,
+ timeout=5,
+ aws_conn_id='aws_default',
+ cluster_identifier='test_cluster',
+ target_status=RedshiftClusterStates.AVAILABLE,
Review comment:
so you already have a test that checks for `available`... i'm trying to think how you can verify the new behavior, namely that it will wait for whatever state you put here --- maybe you can think of a good way
##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -37,18 +37,25 @@ def __init__(
self,
*,
cluster_identifier: str,
- target_status: str = 'available',
+ target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
aws_conn_id: str = 'aws_default',
**kwargs,
):
super().__init__(**kwargs)
self.cluster_identifier = cluster_identifier
- self.target_status = target_status
+ self.target_status = (
+ target_status
+ if isinstance(target_status, RedshiftClusterStates)
+ else RedshiftClusterStates(str(target_status))
+ )
+
Review comment:
also, if you intend to be permissive you should hint above with
```python
target_status: Union[RedshiftClusterStates, str] = RedshiftClusterStates.AVAILABLE,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org