You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/30 21:17:59 UTC

[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759631075



##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.

Review comment:
       ```suggestion
       Pause an AWS Redshift Cluster if it has status `available`.
   ```

##########
File path: tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator
+
+try:
+    from moto import mock_redshift

Review comment:
       it does  not  appear that you use this anywhere 🤔

##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -37,18 +37,25 @@ def __init__(
         self,
         *,
         cluster_identifier: str,
-        target_status: str = 'available',
+        target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
         aws_conn_id: str = 'aws_default',
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
-        self.target_status = target_status
+        self.target_status = (
+            target_status
+            if isinstance(target_status, RedshiftClusterStates)
+            else RedshiftClusterStates(str(target_status))
+        )
+
         self.aws_conn_id = aws_conn_id
         self.hook: Optional[RedshiftHook] = None
 
     def poke(self, context):
-        self.log.info('Poking for status : %s\nfor cluster %s', self.target_status, self.cluster_identifier)
+        self.log.info(
+            'Poking for status : %s\nfor cluster %s', self.target_status.value, self.cluster_identifier

Review comment:
       ```suggestion
               'Checking cluster %r for status %r', self.cluster_identifier, self.target_status.value
   ```
   or
   ```suggestion
               'Checking cluster %r\nwaiting for status %r', self.cluster_identifier, self.target_status.value
   ```
   
   because `for cluster %s` is not really a grammatical expression by itself and it's on a newline so it seems that it should be.

##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+        self.check_interval = check_interval
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
+        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state == RedshiftClusterStates.AVAILABLE:

Review comment:
       if not met i would log info, or perhaps even warning because it seems that it is unexpected.

##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+    """
+    Resume an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,

Review comment:
       it appears nothing is done with this parameter

##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+    """
+    Resume an AWS Redshift Cluster using boto3.

Review comment:
       ```suggestion
       Resume a paused AWS Redshift Cluster
   ```

##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+    """
+    Resume an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+        self.check_interval = check_interval
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Starting Redshift cluster %s", self.cluster_identifier)
+        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state == RedshiftClusterStates.PAUSED:

Review comment:
       if this condition is not met i would `log.info`

##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       i'm a little unsure about converting to enum here.  You don't control the API so you can't guarantee that you will have the value in your enum.  If an unexpected value  appears here, you'll get an unexpected failure.  I think you can use the enum for evaluating whether the returned value  is the one you are looking for, but I don't think it's a good idea to immediately convert it without error handling.  I suppose you could catch `ValueError` and return an uncategorized enum value or something....  but probably better to just leave it raw.  i bet @uranusjr would  have some wisdom to share here. 

##########
File path: tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_resume_cluster import RedshiftResumeClusterOperator
+
+try:
+    from moto import mock_redshift
+except ImportError:
+    mock_redshift = None
+
+
+class TestResumeClusterOperator(unittest.TestCase):
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+
+    def test_init(self):

Review comment:
       again in this one you don't seem to be verifying the behavior

##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -37,18 +37,25 @@ def __init__(
         self,
         *,
         cluster_identifier: str,
-        target_status: str = 'available',
+        target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
         aws_conn_id: str = 'aws_default',
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
-        self.target_status = target_status
+        self.target_status = (
+            target_status
+            if isinstance(target_status, RedshiftClusterStates)
+            else RedshiftClusterStates(str(target_status))
+        )
+

Review comment:
       why not simply
   
   ```suggestion
           self.target_status = RedshiftClusterStates(target_status)
   ```
   
   it does  work even if target_status  is the enum

##########
File path: docs/apache-airflow-providers-amazon/operators/redshift.rst
##########
@@ -94,3 +94,26 @@ All together, here is our DAG:
     :language: python
     :start-after: [START redshift_operator_howto_guide]
     :end-before: [END redshift_operator_howto_guide]
+
+
+.. _howto/operator:RedshiftResumeClusterOperator:
+
+Resume a Redshift Cluster
+"""""""""""""""""""""""""""""""""""""""""""
+
+To resume an existing AWS Redshift Cluster you can use
+:class:`~airflow.providers.amazon.aws.operators.redshift_resume_cluster.RedshiftResumeClusterOperator`.
+
+This Operator leverages the AWS CLI
+`resume-cluster <https://docs.aws.amazon.com/cli/latest/reference/redshift/resume-cluster.html>`__ API
+
+.. _howto/operator:RedshiftPauseClusterOperator:
+
+Pause a Redshift Cluster
+"""""""""""""""""""""""""""""""""""""""""""
+
+To pause an existing AWS Redshift Cluster you can use

Review comment:
       ```suggestion
   To pause an 'available' AWS Redshift Cluster you can use
   ```

##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,

Review comment:
       unused?

##########
File path: tests/providers/amazon/aws/sensors/test_redshift.py
##########
@@ -81,7 +96,7 @@ def test_poke_cluster_not_found(self):
             timeout=5,
             aws_conn_id='aws_default',
             cluster_identifier='test_cluster_not_found',
-            target_status='cluster_not_found',
+            target_status=RedshiftClusterStates.NONEXISTENT,
         )
 
         assert op.poke(None)

Review comment:
       ```suggestion
           assert op.poke(None) is True
   ```

##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None
         except self.get_conn().exceptions.ClusterNotFoundFault:
-            return 'cluster_not_found'

Review comment:
       this is technically a breaking change, converting the return here from this string value to the enum (also with a _different_ value) _(come to think of it, it's breaking for any value, not just for not found)_
   
   @uranusjr since i have your attention on this PR anyway, wondering if you have any thoughts about what we should do here.  should we allow this breaking change and do a major release for this provider?  or should we say, not worth it and we should keep backward compatibity somehow (e.g. by using `str, Enum` inheritance and using `cluster_not_found` instead of `nonexistent`?

##########
File path: docs/apache-airflow-providers-amazon/operators/redshift.rst
##########
@@ -94,3 +94,26 @@ All together, here is our DAG:
     :language: python
     :start-after: [START redshift_operator_howto_guide]
     :end-before: [END redshift_operator_howto_guide]
+
+
+.. _howto/operator:RedshiftResumeClusterOperator:
+
+Resume a Redshift Cluster
+"""""""""""""""""""""""""""""""""""""""""""
+
+To resume an existing AWS Redshift Cluster you can use

Review comment:
       ```suggestion
   To resume a 'paused' AWS Redshift Cluster you can use
   ```

##########
File path: tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator
+
+try:
+    from moto import mock_redshift
+except ImportError:
+    mock_redshift = None
+
+
+class TestPauseClusterOperator(unittest.TestCase):

Review comment:
       you don't really seem to be testing the behavior of the operator.
   
   i'd want to verify two things.  1. verify that when the cluster is `available`, `pause_cluster` is called 2.  when cluster has any other status then it's not called (and perhaps verify that instead a warning is emitted if you decide to add  one)

##########
File path: tests/providers/amazon/aws/sensors/test_redshift.py
##########
@@ -56,6 +57,20 @@ def test_poke(self):
         )
         assert op.poke(None)
 
+    @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present')
+    @mock_redshift
+    def test_poke_with_cluster_state(self):
+        self._create_cluster()
+        op = AwsRedshiftClusterSensor(
+            task_id='test_cluster_sensor',
+            poke_interval=1,
+            timeout=5,
+            aws_conn_id='aws_default',
+            cluster_identifier='test_cluster',
+            target_status=RedshiftClusterStates.AVAILABLE,

Review comment:
       so you already have a test that checks for `available`... i'm trying to think how you can verify the new behavior, namely that it will wait for whatever state you put here --- maybe you can think of a good way

##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -37,18 +37,25 @@ def __init__(
         self,
         *,
         cluster_identifier: str,
-        target_status: str = 'available',
+        target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
         aws_conn_id: str = 'aws_default',
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
-        self.target_status = target_status
+        self.target_status = (
+            target_status
+            if isinstance(target_status, RedshiftClusterStates)
+            else RedshiftClusterStates(str(target_status))
+        )
+

Review comment:
       also, if you intend to be permissive you should hint above with
   
   ```python
   target_status: Union[RedshiftClusterStates, str] = RedshiftClusterStates.AVAILABLE,
   ```
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org