You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/17 23:30:40 UTC

[GitHub] [airflow] dbarrundiag opened a new pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

dbarrundiag opened a new pull request #19665:
URL: https://github.com/apache/airflow/pull/19665


   This commit adds new exciting features to the Airflow AWS Redshift module:
   
   #1 - Adding a `wait_for_state` method to the Redshift hook
   
   Similar to the EC2 hook in `airflow/providers/amazon/aws/hooks/ec2.py`, this commit implements a `wait_for_state` function in the redshift module `airflow/providers/amazon/aws/hooks/redshift.py`. This will allow developers to leverage this hook to have workflows that depend on the status of a redshift cluster
   
   #2 - New Operators to manage the status of a Redshift Cluster
    Additionally we are implementing two new Operators: `RedshiftResumeClusterOperator` and `RedshiftPauseClusterOperator`. These operators will let developers leverage Airflow to resume and pause Redshift clusters before/after loading data to optimize for costs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760454315



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       yes please let's avoid breaking
   
   in my view you are still welcome to use the enum for the purpose of evaluating the response from the API but just don't force the raw response into Enum
   
   thank you πŸ™




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r754594313



##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -43,7 +43,10 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
-        self.target_status = target_status
+        self.target_status = (
+            target_status if isinstance(target_status, ClusterStates) else ClusterStates(str(target_status))
+        )
+
         self.aws_conn_id = aws_conn_id
         self.hook: Optional[RedshiftHook] = None
 

Review comment:
       I think it's a good idea to show the `.value`, it's cleaner in my opinion. I made the change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760507529



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       +1 for one file per service




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760555125



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       no objection from me
   
   however i will defend having separate modules when it makse sense.  if a service has a _lot_ of operators, or very long and complicated operators, or  certain kinds or families of operators for the same service, why not split into a few modules 🀷




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r753948606



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -138,6 +138,26 @@ def create_cluster_snapshot(self, snapshot_identifier: str, cluster_identifier:
         )
         return response['Snapshot'] if response['Snapshot'] else None
 
+    def wait_for_state(self, cluster_identifier: str, target_state: str, check_interval: float) -> None:

Review comment:
       There is a design pattern in several operators that include the option, rather than a forced action, to wait for termination/completion coupled with a sensor as @o-nikolas mentions:
   https://github.com/apache/airflow/blob/main/airflow/providers/microsoft/azure/operators/data_factory.py
   https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/dataflow.py
   https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/dataproc.py
   https://github.com/apache/airflow/blob/main/airflow/providers/tableau/operators/tableau_refresh_workbook.py
   
   This way users could choose whether to wait for the task to complete as part of the operator or use a sensor to poke for status (mainly useful for long-running actions). Of course, Deferrable Operators solve for this too πŸ™‚ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759698875



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       Good discussion folks!
   
   IMO if there are breaking API changes and the API starts returning different states than you expect it's going to break your code whether you're using strings or an enum, so it's a moot point. I still vote for using Enums from the start here, add error handling, and if APIs change then this code and other code in the related operators will require attention like any other breaking API changes cause.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760457905



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       Hey @dstandish,
   
   Yes, you would need to correctly handle the case of a new status, but there's no reason you can't correctly handle this as well as use Eunms (use error handling and coerce anything we haven't seen yet to an UNKOWN enum for example, or just continue to return None like other errors cases can do in this existing code). Certainly each approach has pros and cons, but I see no breaking reason if you chose to use Enums.
   
   Though I'm happy to disagree and commit to using strings if that's what others think as well :)  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish merged pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish merged pull request #19665:
URL: https://github.com/apache/airflow/pull/19665


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r765422377



##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,69 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator(unittest.TestCase):
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+
+    def test_init(self):
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        assert redshift_operator.task_id == "task_test"
+        assert redshift_operator.cluster_identifier == "test_cluster"
+        assert redshift_operator.aws_conn_id == "aws_conn_test"
+
+    def test_resume_cluster(self):

Review comment:
       no worries at all, thank _you_ for your effort and contribution




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r765861164



##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,113 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator:
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+

Review comment:
       ```suggestion
   ```
   
   You can remove this since it's not used.

##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,113 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator:
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+
+    def test_init(self):
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        assert redshift_operator.task_id == "task_test"
+        assert redshift_operator.cluster_identifier == "test_cluster"
+        assert redshift_operator.aws_conn_id == "aws_conn_test"
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status")
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn")
+    def test_resume_cluster_is_called_when_cluster_is_paused(self, mock_get_conn, mock_cluster_status):
+        conn = MagicMock()
+        mock_run = conn.resume_cluster
+        mock_get_conn.return_value = conn
+
+        mock_cluster_status.return_value = 'paused'
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        redshift_operator.execute(None)
+        mock_run.assert_called_once_with(ClusterIdentifier='test_cluster')

Review comment:
       ```suggestion
           mock_cluster_status.return_value = 'paused'
           redshift_operator = RedshiftResumeClusterOperator(
               task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
           )
           redshift_operator.execute(None)
           mock_get_conn.return_value.resume_cluster.assert_called_once_with(ClusterIdentifier='test_cluster')
   ```
   
   We can actually simplify this a tad bit.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r754263007



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -138,6 +138,26 @@ def create_cluster_snapshot(self, snapshot_identifier: str, cluster_identifier:
         )
         return response['Snapshot'] if response['Snapshot'] else None
 
+    def wait_for_state(self, cluster_identifier: str, target_state: str, check_interval: float) -> None:

Review comment:
       got it thanks @josh-fell, the latest commit fixes this and removes the `wait_for_state` and this way users can choose to use the sensor (already available).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r754589627



##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -43,7 +43,10 @@ def __init__(
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
-        self.target_status = target_status
+        self.target_status = (
+            target_status if isinstance(target_status, ClusterStates) else ClusterStates(str(target_status))
+        )
+
         self.aws_conn_id = aws_conn_id
         self.hook: Optional[RedshiftHook] = None
 

Review comment:
       Note:
   The logging on line 54 below isn't going to look like it did before now that it's an Enum. Adding `.value` to the Enum will create the same log.
   ```python
   In [16]: from enum import Enum                                                                                        
   
   In [17]: class ClusterStates(Enum): 
       ...:     """Contains the possible State values of a Redshift Cluster.""" 
       ...:     AVAILABLE = 'available' 
       ...:     CREATING = 'creating' 
       ...:                                                                                                              
   
   In [18]: target_state = ClusterStates.AVAILABLE                                                                       
   
   In [19]: print('looking for %s state' % target_state)                                                                 
   looking for ClusterStates.AVAILABLE state
   
   In [20]: print('looking for %s state' % target_state.value)                                                           
   looking for available state
   
   In [21]:  
   ```
   
   But perhaps it's actually better leaving it as is so that the logging makes it clear that it's an Enum? I don't feel strongly one way or another actually. What do you think?:
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759838433



##########
File path: tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py
##########
@@ -52,10 +52,8 @@ def test_init(self):
         redshift_operator = RedshiftPauseClusterOperator(
             task_id="task_test",
             cluster_identifier="test_cluster",
-            aws_conn_id="aws_conn_test",
-            check_interval=3,
+            aws_conn_id="aws_conn_test"

Review comment:
       You have a default value in the Operator for this, am I missing a reason to have it defined here?  Looks like an opportunity to just use the default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759670233



##########
File path: tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator
+
+try:
+    from moto import mock_redshift
+except ImportError:
+    mock_redshift = None
+
+
+class TestPauseClusterOperator(unittest.TestCase):

Review comment:
       thanks so much for the feedback @dstandish! completely agree with your assessment, the reason I was not able to perform this unittest is because the `moto` library still does not support the `pause_cluster()` or `resume_cluster()` operator. So running this operator on the "mock" cluster will raise a `NotImplementedError` :s 
   
   
   This is the Issue i opened with `moto` -> https://github.com/spulec/moto/issues/4591




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759838433



##########
File path: tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py
##########
@@ -52,10 +52,8 @@ def test_init(self):
         redshift_operator = RedshiftPauseClusterOperator(
             task_id="task_test",
             cluster_identifier="test_cluster",
-            aws_conn_id="aws_conn_test",
-            check_interval=3,
+            aws_conn_id="aws_conn_test"

Review comment:
       Here and below:  You have a default value in the Operator for this, am I missing a reason to have it defined here?  Looks like an opportunity to just use the default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760454315



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       yes please let's avoid breaking
   
   in my view you are still welcome to use the enum for the purpose of evaluating the response from the API  (and also for returning the "not found" `value`) but just don't force the raw response into Enum
   
   thank you πŸ™




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760478968



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       > coerce anything we haven't seen yet to an UNKOWN Enum state, or just continue to return None like other errors cases can do in this existing code
   
   Yup it's true this is an approach that would be more tolerant of the unexpected, it's just not what was done here.
   
   But I would say there is little value in implementing such error handling in an effort to still convert in this return value.  If your value conforms to the enum, then your comparisons against the enum would work even without converting in the return.  And if it doesn't conform, then you either fail (current code state), or throw away information (replacing with None or UNKNOWN).  So what's the point of converting it to an enum at all?  I would just leave the raw values alone and use the enums for evaluation.
   
   > Though I'm happy to disagree and commit to using strings if that's what others think as well :)
   
   I don't know that we need to make a universal proclaimation but in this case I think not converting to enum makes sense :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r764378031



##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,69 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator(unittest.TestCase):

Review comment:
       we no longer subclass unittest.TestCase (no subclass required)
   

##########
File path: airflow/providers/amazon/aws/operators/redshift.py
##########
@@ -71,3 +71,85 @@ def execute(self, context: dict) -> None:
         self.log.info(f"Executing statement: {self.sql}")
         hook = self.get_hook()
         hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+    """
+    Resume a paused AWS Redshift Cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftResumeClusterOperator`
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Starting Redshift cluster %s", self.cluster_identifier)
+        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state == 'paused':
+            redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)

Review comment:
       ```suggestion
           cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
           if cluster_state == 'paused':
               self.log.info("Starting Redshift cluster %s", self.cluster_identifier)
               redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
   ```
   
   shoulld probably  only say "starting" if it's actually gonna try to do it

##########
File path: airflow/providers/amazon/aws/operators/redshift.py
##########
@@ -71,3 +71,85 @@ def execute(self, context: dict) -> None:
         self.log.info(f"Executing statement: {self.sql}")
         hook = self.get_hook()
         hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+    """
+    Resume a paused AWS Redshift Cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftResumeClusterOperator`
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Starting Redshift cluster %s", self.cluster_identifier)
+        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state == 'paused':
+            redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
+        else:
+            self.log.warning(
+                "Unable to resume cluster since cluster is currently in status: %s", cluster_state
+            )
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster if it has status `available`.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftPauseClusterOperator`
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
+        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state == 'available':
+            redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)

Review comment:
       ```suggestion
           cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
           if cluster_state == 'available':
               self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
               redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)
   ```

##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,69 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator(unittest.TestCase):
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+
+    def test_init(self):
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        assert redshift_operator.task_id == "task_test"
+        assert redshift_operator.cluster_identifier == "test_cluster"
+        assert redshift_operator.aws_conn_id == "aws_conn_test"
+
+    def test_resume_cluster(self):

Review comment:
       you can (and should) actually add some reasonable tests for this operator.
   
   use standard mocking techniques
   
   for the resume case you can at least verify that when the returned cluster state is `paused`, that `resume_cluster`  is called
   
   similar for the pause case
   

##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,69 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator(unittest.TestCase):
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')

Review comment:
       are we using this?  if not should chop  it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r764419580



##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,69 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator(unittest.TestCase):
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+
+    def test_init(self):
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        assert redshift_operator.task_id == "task_test"
+        assert redshift_operator.cluster_identifier == "test_cluster"
+        assert redshift_operator.aws_conn_id == "aws_conn_test"
+
+    def test_resume_cluster(self):

Review comment:
       I would 100% want to add a unittest but due to the mock issue i describe here: https://github.com/apache/airflow/pull/19665#discussion_r759670233 and https://github.com/spulec/moto/issues/4591
   
   So that's why I added the TODO until we are able to get `#4591` out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760454315



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       yes please let's avoid breaking
   
   in my view you are still welcome to use the enum for the purpose of evaluating the response from the API  (and also, if you like, for returning the "not found" `value`) but just don't force the raw response into Enum
   
   thank you πŸ™




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759670233



##########
File path: tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator
+
+try:
+    from moto import mock_redshift
+except ImportError:
+    mock_redshift = None
+
+
+class TestPauseClusterOperator(unittest.TestCase):

Review comment:
       thanks so much for the feedback @dstandish! completely agree with your assessment, the reason I was not able to perform this unittest is because the `moto` library still does not support the `pause_cluster()` or `resume_cluster()` operator. So running this operator on the "mock" cluster will raise a `NotImplementedError` :s 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759676530



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       @o-nikolas had some good thoughts here on using the `Enum`:
   https://github.com/apache/airflow/pull/19665#discussion_r753507478
   
   essentially because that's what was used on the EKS operators similar to this `airflow/airflow/providers/amazon/aws/hooks/eks.py`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760511965



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       For what it's worth, I also considered pointing that out and I agree.  One file per service (well.. one file in hooks, one in operators, one sensors, etc) should be the way to go IMHO.
   
   `/airflow/providers/amazon/aws/operators/service_name.py` would make it easy to find what you are looking for.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760507529



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       +1 for a file per service




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759678644



##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -37,18 +37,25 @@ def __init__(
         self,
         *,
         cluster_identifier: str,
-        target_status: str = 'available',
+        target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
         aws_conn_id: str = 'aws_default',
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
-        self.target_status = target_status
+        self.target_status = (
+            target_status
+            if isinstance(target_status, RedshiftClusterStates)
+            else RedshiftClusterStates(str(target_status))
+        )
+

Review comment:
       i didn't know it would work even if `target_status` is the enum :) thanks for that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760487292



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       Thanks @eladkal! Should i move the 2 operators to `redfshift.py` then? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760555125



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       no objection from me
   
   however i will defend having separate modules when it makse sense.  if a service has a _lot_ of operators, or very long and complicated operators, or  certain kinds or families of operators for the same service, why not split into a few modules 🀷
   
   but one module seems like a good "default" or starting point




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r765369339



##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,69 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator(unittest.TestCase):
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+
+    def test_init(self):
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        assert redshift_operator.task_id == "task_test"
+        assert redshift_operator.cluster_identifier == "test_cluster"
+        assert redshift_operator.aws_conn_id == "aws_conn_test"
+
+    def test_resume_cluster(self):

Review comment:
       you are absolutely right @dstandish - i should have done more research on MagicMock and not give up just because `moto` didn't have this API implemented. 
   
   I have been able to add unittests now using the strategy you suggested, THANK YOU SO MUCH!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r765957126



##########
File path: tests/providers/amazon/aws/sensors/test_redshift.py
##########
@@ -39,8 +39,6 @@ def _create_cluster():
             MasterUsername='admin',
             MasterUserPassword='mock_password',
         )
-        if not client.describe_clusters()['Clusters']:
-            raise ValueError('AWS not properly mocked')

Review comment:
       i think this may not have been intended

##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -48,7 +48,7 @@ def __init__(
         self.hook: Optional[RedshiftHook] = None
 
     def poke(self, context):
-        self.log.info('Poking for status : %s\nfor cluster %s', self.target_status, self.cluster_identifier)
+        self.log.info('Checking cluster %r for status %r', self.cluster_identifier, self.target_status)

Review comment:
       probablly best to remove this change too since it's unrelated.  it made sense  when  you were changing everything to enum and touching this bit of code anyway.  but not anymore




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r764429805



##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,69 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator(unittest.TestCase):
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+
+    def test_init(self):
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        assert redshift_operator.task_id == "task_test"
+        assert redshift_operator.cluster_identifier == "test_cluster"
+        assert redshift_operator.aws_conn_id == "aws_conn_test"
+
+    def test_resume_cluster(self):

Review comment:
       I understand you might want to be able to use that library.  But you don't need to.  And anyway just because you can't use that library, it doesn't mean we waive our requirement that things be tested.
   
   It is pretty straightforward to mock the return value from `cluster_status`.
   
   Then you could have `get_conn` return a magicmock and assert that, on it, pause_cluster is called.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r765862641



##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,113 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator:
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+
+    def test_init(self):
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        assert redshift_operator.task_id == "task_test"
+        assert redshift_operator.cluster_identifier == "test_cluster"
+        assert redshift_operator.aws_conn_id == "aws_conn_test"
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status")
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn")
+    def test_resume_cluster_is_called_when_cluster_is_paused(self, mock_get_conn, mock_cluster_status):
+        conn = MagicMock()
+        mock_run = conn.resume_cluster
+        mock_get_conn.return_value = conn
+
+        mock_cluster_status.return_value = 'paused'
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        redshift_operator.execute(None)
+        mock_run.assert_called_once_with(ClusterIdentifier='test_cluster')

Review comment:
       ```suggestion
           mock_cluster_status.return_value = 'paused'
           redshift_operator = RedshiftResumeClusterOperator(
               task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
           )
           redshift_operator.execute(None)
           mock_get_conn.return_value.resume_cluster.assert_called_once_with(ClusterIdentifier='test_cluster')
   ```
   
   We can actually simplify this a tad bit.
   
   Same for `test_resume_cluster_not_called_when_cluster_is_not_paused` too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760504696



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       For what is worth, i do think it's kind of crazy to have a "file-per-operator". If my vote counts for anything I would prefer to have the operators to `redshift.py`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760576596



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       In the next few days ill open an issue with full mapping of the aws operators and we can discuss each case.
   
   For this specific PR it appears that we are all agreed so @dbarrundiag you can proceed with single file :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759631075



##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.

Review comment:
       ```suggestion
       Pause an AWS Redshift Cluster if it has status `available`.
   ```

##########
File path: tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator
+
+try:
+    from moto import mock_redshift

Review comment:
       it does  not  appear that you use this anywhere πŸ€”

##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -37,18 +37,25 @@ def __init__(
         self,
         *,
         cluster_identifier: str,
-        target_status: str = 'available',
+        target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
         aws_conn_id: str = 'aws_default',
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
-        self.target_status = target_status
+        self.target_status = (
+            target_status
+            if isinstance(target_status, RedshiftClusterStates)
+            else RedshiftClusterStates(str(target_status))
+        )
+
         self.aws_conn_id = aws_conn_id
         self.hook: Optional[RedshiftHook] = None
 
     def poke(self, context):
-        self.log.info('Poking for status : %s\nfor cluster %s', self.target_status, self.cluster_identifier)
+        self.log.info(
+            'Poking for status : %s\nfor cluster %s', self.target_status.value, self.cluster_identifier

Review comment:
       ```suggestion
               'Checking cluster %r for status %r', self.cluster_identifier, self.target_status.value
   ```
   or
   ```suggestion
               'Checking cluster %r\nwaiting for status %r', self.cluster_identifier, self.target_status.value
   ```
   
   because `for cluster %s` is not really a grammatical expression by itself and it's on a newline so it seems that it should be.

##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+        self.check_interval = check_interval
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
+        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state == RedshiftClusterStates.AVAILABLE:

Review comment:
       if not met i would log info, or perhaps even warning because it seems that it is unexpected.

##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+    """
+    Resume an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,

Review comment:
       it appears nothing is done with this parameter

##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+    """
+    Resume an AWS Redshift Cluster using boto3.

Review comment:
       ```suggestion
       Resume a paused AWS Redshift Cluster
   ```

##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+    """
+    Resume an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+        self.check_interval = check_interval
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Starting Redshift cluster %s", self.cluster_identifier)
+        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state == RedshiftClusterStates.PAUSED:

Review comment:
       if this condition is not met i would `log.info`

##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       i'm a little unsure about converting to enum here.  You don't control the API so you can't guarantee that you will have the value in your enum.  If an unexpected value  appears here, you'll get an unexpected failure.  I think you can use the enum for evaluating whether the returned value  is the one you are looking for, but I don't think it's a good idea to immediately convert it without error handling.  I suppose you could catch `ValueError` and return an uncategorized enum value or something....  but probably better to just leave it raw.  i bet @uranusjr would  have some wisdom to share here. 

##########
File path: tests/providers/amazon/aws/operators/test_redshift_resume_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_resume_cluster import RedshiftResumeClusterOperator
+
+try:
+    from moto import mock_redshift
+except ImportError:
+    mock_redshift = None
+
+
+class TestResumeClusterOperator(unittest.TestCase):
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+
+    def test_init(self):

Review comment:
       again in this one you don't seem to be verifying the behavior

##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -37,18 +37,25 @@ def __init__(
         self,
         *,
         cluster_identifier: str,
-        target_status: str = 'available',
+        target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
         aws_conn_id: str = 'aws_default',
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
-        self.target_status = target_status
+        self.target_status = (
+            target_status
+            if isinstance(target_status, RedshiftClusterStates)
+            else RedshiftClusterStates(str(target_status))
+        )
+

Review comment:
       why not simply
   
   ```suggestion
           self.target_status = RedshiftClusterStates(target_status)
   ```
   
   it does  work even if target_status  is the enum

##########
File path: docs/apache-airflow-providers-amazon/operators/redshift.rst
##########
@@ -94,3 +94,26 @@ All together, here is our DAG:
     :language: python
     :start-after: [START redshift_operator_howto_guide]
     :end-before: [END redshift_operator_howto_guide]
+
+
+.. _howto/operator:RedshiftResumeClusterOperator:
+
+Resume a Redshift Cluster
+"""""""""""""""""""""""""""""""""""""""""""
+
+To resume an existing AWS Redshift Cluster you can use
+:class:`~airflow.providers.amazon.aws.operators.redshift_resume_cluster.RedshiftResumeClusterOperator`.
+
+This Operator leverages the AWS CLI
+`resume-cluster <https://docs.aws.amazon.com/cli/latest/reference/redshift/resume-cluster.html>`__ API
+
+.. _howto/operator:RedshiftPauseClusterOperator:
+
+Pause a Redshift Cluster
+"""""""""""""""""""""""""""""""""""""""""""
+
+To pause an existing AWS Redshift Cluster you can use

Review comment:
       ```suggestion
   To pause an 'available' AWS Redshift Cluster you can use
   ```

##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,

Review comment:
       unused?

##########
File path: tests/providers/amazon/aws/sensors/test_redshift.py
##########
@@ -81,7 +96,7 @@ def test_poke_cluster_not_found(self):
             timeout=5,
             aws_conn_id='aws_default',
             cluster_identifier='test_cluster_not_found',
-            target_status='cluster_not_found',
+            target_status=RedshiftClusterStates.NONEXISTENT,
         )
 
         assert op.poke(None)

Review comment:
       ```suggestion
           assert op.poke(None) is True
   ```

##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None
         except self.get_conn().exceptions.ClusterNotFoundFault:
-            return 'cluster_not_found'

Review comment:
       this is technically a breaking change, converting the return here from this string value to the enum (also with a _different_ value) _(come to think of it, it's breaking for any value, not just for not found)_
   
   @uranusjr since i have your attention on this PR anyway, wondering if you have any thoughts about what we should do here.  should we allow this breaking change and do a major release for this provider?  or should we say, not worth it and we should keep backward compatibity somehow (e.g. by using `str, Enum` inheritance and using `cluster_not_found` instead of `nonexistent`?

##########
File path: docs/apache-airflow-providers-amazon/operators/redshift.rst
##########
@@ -94,3 +94,26 @@ All together, here is our DAG:
     :language: python
     :start-after: [START redshift_operator_howto_guide]
     :end-before: [END redshift_operator_howto_guide]
+
+
+.. _howto/operator:RedshiftResumeClusterOperator:
+
+Resume a Redshift Cluster
+"""""""""""""""""""""""""""""""""""""""""""
+
+To resume an existing AWS Redshift Cluster you can use

Review comment:
       ```suggestion
   To resume a 'paused' AWS Redshift Cluster you can use
   ```

##########
File path: tests/providers/amazon/aws/operators/test_redshift_pause_cluster.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import boto3
+
+from airflow.providers.amazon.aws.operators.redshift_pause_cluster import RedshiftPauseClusterOperator
+
+try:
+    from moto import mock_redshift
+except ImportError:
+    mock_redshift = None
+
+
+class TestPauseClusterOperator(unittest.TestCase):

Review comment:
       you don't really seem to be testing the behavior of the operator.
   
   i'd want to verify two things.  1. verify that when the cluster is `available`, `pause_cluster` is called 2.  when cluster has any other status then it's not called (and perhaps verify that instead a warning is emitted if you decide to add  one)

##########
File path: tests/providers/amazon/aws/sensors/test_redshift.py
##########
@@ -56,6 +57,20 @@ def test_poke(self):
         )
         assert op.poke(None)
 
+    @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present')
+    @mock_redshift
+    def test_poke_with_cluster_state(self):
+        self._create_cluster()
+        op = AwsRedshiftClusterSensor(
+            task_id='test_cluster_sensor',
+            poke_interval=1,
+            timeout=5,
+            aws_conn_id='aws_default',
+            cluster_identifier='test_cluster',
+            target_status=RedshiftClusterStates.AVAILABLE,

Review comment:
       so you already have a test that checks for `available`... i'm trying to think how you can verify the new behavior, namely that it will wait for whatever state you put here --- maybe you can think of a good way

##########
File path: airflow/providers/amazon/aws/sensors/redshift.py
##########
@@ -37,18 +37,25 @@ def __init__(
         self,
         *,
         cluster_identifier: str,
-        target_status: str = 'available',
+        target_status: RedshiftClusterStates = RedshiftClusterStates.AVAILABLE,
         aws_conn_id: str = 'aws_default',
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
-        self.target_status = target_status
+        self.target_status = (
+            target_status
+            if isinstance(target_status, RedshiftClusterStates)
+            else RedshiftClusterStates(str(target_status))
+        )
+

Review comment:
       also, if you intend to be permissive you should hint above with
   
   ```python
   target_status: Union[RedshiftClusterStates, str] = RedshiftClusterStates.AVAILABLE,
   ```
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760457905



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       Hey @dstandish,
   
   Yes, you would need to correctly handle the case of an unknown/new status, but there's no reason you can't correctly handle this as well as use Eunms (E.g. coerce anything we haven't seen yet to an UNKOWN Enum state, or just continue to return None like other errors cases can do in this existing code). Certainly each approach has pros and cons, but I see no breaking reason if you chose to use Enums.
   
   Though I'm happy to disagree and commit to using strings if that's what others think as well :)  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760356427



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       Hooks are necessarily tied to the API/service they wrap, and I agree with @dstandish  -- we don't want our airflow Hook to suddley _not work at all_ when AWS add a new status code to their API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r753513595



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -138,6 +138,26 @@ def create_cluster_snapshot(self, snapshot_identifier: str, cluster_identifier:
         )
         return response['Snapshot'] if response['Snapshot'] else None
 
+    def wait_for_state(self, cluster_identifier: str, target_state: str, check_interval: float) -> None:

Review comment:
       Thanks for the contribution!
   
   Instead of writing this wait method in the hook and injecting it inside many of the operators you can instead create a sensor for waiting for states. That sensor can then be added to dags as needed. It's a bit more graceful of a solution especially when combined with something like Smart Sensors.
   
   You can see an example in the recently developed EKS code. You can see the sensor here:
   https://github.com/apache/airflow/blob/752575cb84a28c41723b5151fd6451a72f4da5fb/airflow/providers/amazon/aws/sensors/eks.py#L54
   
   And you can see it's usage in the EKS example dags, for example:
   https://github.com/apache/airflow/blob/752575cb84a28c41723b5151fd6451a72f4da5fb/airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py#L93




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell edited a comment on pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
josh-fell edited a comment on pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#issuecomment-975136304


   WDYT about information about these new operators to the existing AWS operator docs?
   
   https://github.com/apache/airflow/tree/main/docs/apache-airflow-providers-amazon/operators


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760469246



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       This is just my opinion but I think that the AWS provider has too many files.
   I think ideally operators for the same service should be together in the same file - Just like the GCP provider.
   E.g 
   we shouldn't have `redfshift_result_cluster.py` & `redfshift_result_cluster.py` but we should have `redfshift.py` that contains the 2 operator classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760493986



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       @dstandish WDYT?
   I just love how the GCP is organized with file per service https://github.com/apache/airflow/tree/main/airflow/providers/google/cloud/operators




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r753948606



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -138,6 +138,26 @@ def create_cluster_snapshot(self, snapshot_identifier: str, cluster_identifier:
         )
         return response['Snapshot'] if response['Snapshot'] else None
 
+    def wait_for_state(self, cluster_identifier: str, target_state: str, check_interval: float) -> None:

Review comment:
       There is a design pattern in several operators that include the option, rather than a forced action, to wait for termination/completion if desired. 
   https://github.com/apache/airflow/blob/main/airflow/providers/microsoft/azure/operators/data_factory.py
   https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/dataflow.py
   https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/dataproc.py
   https://github.com/apache/airflow/blob/main/airflow/providers/tableau/operators/tableau_refresh_workbook.py
   
   This way users could choose whether to wait for the task to complete as part of the operator or use a sensor to poke for status (mainly useful for long-running actions). Of course, Deferrable Operators solve for this too πŸ™‚ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r753566061



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -138,6 +138,26 @@ def create_cluster_snapshot(self, snapshot_identifier: str, cluster_identifier:
         )
         return response['Snapshot'] if response['Snapshot'] else None
 
+    def wait_for_state(self, cluster_identifier: str, target_state: str, check_interval: float) -> None:

Review comment:
       Thanks @o-nikolas! I was trying to follow the same pattern as the EC2 modules:
   `airflow/providers/amazon/aws/hooks/ec2.py`
   and
   `airflow/providers/amazon/aws/operators/ec2_start_instance.py`
   
   would you still prefer if i follow the eks example? 
   
   I kind of feel like I could develop the Sensor, but maybe leave the `wait_for_state` in the Operator also? as a safe check? what do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r764373711



##########
File path: airflow/providers/amazon/aws/operators/redshift.py
##########
@@ -71,3 +71,85 @@ def execute(self, context: dict) -> None:
         self.log.info(f"Executing statement: {self.sql}")
         hook = self.get_hook()
         hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
+
+
+class RedshiftResumeClusterOperator(BaseOperator):
+    """
+    Resume a paused AWS Redshift Cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RedshiftResumeClusterOperator`
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Starting Redshift cluster %s", self.cluster_identifier)
+        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state == 'paused':
+            redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)

Review comment:
       ```suggestion
           cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
           if cluster_state == 'paused':
               self.log.info("Starting Redshift cluster %s", self.cluster_identifier)
               redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#issuecomment-983267124


   I like the changes to the docs.  Also left a small comment on the Operator tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759873574



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       > IMO if there are breaking API changes and the API starts returning different states than you expect it's going to break your code whether you're using strings or an enum, so it's a moot point
   
   Not necessarily.  This hook method is just returning status.  If new statuses appear (or if you missed one), it won't break your code, unless you force it into an enum that doesn't know the new status.  And it's far from certain that a new status would break your operator.  If you are waiting for your cluster to become `available` and there's a new status `kindof-almost-available` well that's still not available so your logic is unaffected -- unless of course you are forcing it into an enum and then your code breaks unnecessarily.  That's the scenario that makes me wary of this approach.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759873574



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       > IMO if there are breaking API changes and the API starts returning different states than you expect it's going to break your code whether you're using strings or an enum, so it's a moot point
   
   Not necessarily.  This hook method is just returning status.  If new statuses appear, it won't break your code, unless you force it into an enum that doesn't know the new status.  And it's far from certain that a new status would break your operator.  If you are waiting for your cluster to become `available` and there's a new status `kindof-almost-available` well that's still not available so your logic is unaffected -- unless of course you are forcing it into an enum and then your code breaks unnecessarily.  That's the scenario that makes me wary of this approach.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#issuecomment-992796879


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#issuecomment-992826786


   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#issuecomment-975136304


   WDYT about information about these new operators to the existing AWS operator docs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r754539855



##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+        self.check_interval = check_interval
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
+        cluster_state = ClusterStates(redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier))
+        if cluster_state == ClusterStates.AVAILABLE:

Review comment:
       Looking good!
   
   Do you mind moving the cast to Enum to inside the `cluster_status` method? Same as I linked in my original comment:
   https://github.com/apache/airflow/blob/752575cb84a28c41723b5151fd6451a72f4da5fb/airflow/providers/amazon/aws/hooks/eks.py#L364-L368
   
   That way you don't have to wrap every call to `cluster_status` in `ClusterStates`. You may have to go and update some existing uses of cluster_status which are expecting a string to now expect an enum, but that shouldn't be too bad.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759689379



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       i read the comment and yeah in the 'not found' case that is the focus of the example, sure returning an enum would be fine -- it's completely in your control and a "made-up" status --- but forcing the raw api response into enum is what i feel more iffy about.
   
   and i'd easily say keep the enum there and use it when evaluating the response (e.g. is it paused or available -- use the enum rather than hardcode). just am unsure about converting the raw response in the return value and asking for second opinion :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759689379



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       i read the comment and yeah in the 'not found' case that is the focus of the example, sure returning an enum would be fine -- it's completely in your control and a "made-up" status --- but forcing the raw api response into enum is what i feel more iffy about.
   
   and i'd easily say keep the enum there and use it when evaluating the response (e.g. is it paused or available -- use the enum to evaluate the string against rather than hardcode). just am unsure about converting the raw response in the return value and asking for second opinion :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759698875



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       Good discussion folks!
   
   IMO if there are breaking API changes and the API starts returning different states than you expect it's going to break your code whether you're using strings or an enum, so it's a moot point. I still vote for using Enums from the start here, add error handling, and if APIs change then this code and other code in the operators will require attention like any other breaking API changes cause.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759698875



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       Good discussion folks!
   
   IMO if there are breaking API changes and it's returning different states than you expect it's going to break your code whether you're using strings or an enum, so it's a moot point. I still vote for using Enums from the start here, add error handling, and if APIs change then this code and other code in the operators will require attention like any other breaking API changes cause.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r754579222



##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+        self.check_interval = check_interval
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
+        cluster_state = ClusterStates(redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier))
+        if cluster_state == ClusterStates.AVAILABLE:

Review comment:
       Done! Thanks again @o-nikolas for all the help!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r764938773



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       Followup issue https://github.com/apache/airflow/issues/20139




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] o-nikolas commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r753507478



##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+        self.check_interval = check_interval
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
+        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state == 'available':

Review comment:
       Having the states be strings leads to having magic strings everywhere like this.
   
   In the recently developed EKS operator we added an enum for the possible states. It cleans up the code and also helps users to know/autocomplete which states there are to be used.
   
   The states are defined like so:
   https://github.com/apache/airflow/blob/752575cb84a28c41723b5151fd6451a72f4da5fb/airflow/providers/amazon/aws/hooks/eks.py#L43-L51
   
   Then the get_state hook method returns the enum values like so:
   https://github.com/apache/airflow/blob/752575cb84a28c41723b5151fd6451a72f4da5fb/airflow/providers/amazon/aws/hooks/eks.py#L364-L368
   
   And then it's all used together like so:
   https://github.com/apache/airflow/blob/752575cb84a28c41723b5151fd6451a72f4da5fb/airflow/providers/amazon/aws/operators/eks.py#L181-L188




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r765884994



##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,113 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator:
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+

Review comment:
       Done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ferruzzi commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r754503713



##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import ClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+        self.check_interval = check_interval
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
+        cluster_state = ClusterStates(redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier))
+        if cluster_state == ClusterStates.AVAILABLE:
+            redshift_hook.get_conn().pause_cluster(ClusterIdentifier=self.cluster_identifier)

Review comment:
       First, thanks for the contribution!  As a new contributor myself, I'll make a comment but leave it up to a Committer to decide if it's a required change.
   
   I'd suggest adding these as methods in the hook as well.  That would make your code a bit cleaner and make it easier for others to write custom Operators.  For example, check out how the [hooks/redshift.py](https://github.com/ferruzzi/airflow/blob/main/airflow/providers/amazon/aws/hooks/redshift.py#L72) file handles `delete_cluster`.  It basically just offloads the `get_conn()` part and passes the variables through, but then in the Operator it's a cleaner `redshift_hook.delete_cluster(ClusterIdentifier=self.cluster_identifier)` instead of having to stub in the  `get_conn()` everywhere in your Operator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r753564209



##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_identifier = cluster_identifier
+        self.aws_conn_id = aws_conn_id
+        self.check_interval = check_interval
+
+    def execute(self, context):
+        redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
+        self.log.info("Pausing Redshift cluster %s", self.cluster_identifier)
+        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        if cluster_state == 'available':

Review comment:
       excellent - will do this πŸ‘ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#issuecomment-972275135


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better πŸš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759873574



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       > IMO if there are breaking API changes and the API starts returning different states than you expect it's going to break your code whether you're using strings or an enum, so it's a moot point
   
   Not necessarily.  This hook method is just returning status.  If new statuses appear (or if you missed one in your research), it won't break your code, unless you force it into an enum that doesn't know the new status.  And it's far from certain that a new status would break your operator.  If you are waiting for your cluster to become `available` and there's a new status `kindof-almost-available` well that's still not available so your logic is unaffected -- unless of course you are forcing it into an enum and then your code breaks unnecessarily.  That's the scenario that makes me wary of this approach.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r759670586



##########
File path: airflow/providers/amazon/aws/operators/redshift_pause_cluster.py
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftPauseClusterOperator(BaseOperator):
+    """
+    Pause an AWS Redshift Cluster using boto3.
+
+    :param cluster_identifier: id of the AWS Redshift Cluster
+    :type cluster_identifier: str
+    :param aws_conn_id: aws connection to use
+    :type aws_conn_id: str
+    :param check_interval: time in seconds that the job should wait in
+        between each instance state checks until operation is completed
+    :type check_interval: float
+    """
+
+    template_fields = ("cluster_identifier",)
+    ui_color = "#eeaa11"
+    ui_fgcolor = "#ffffff"
+
+    def __init__(
+        self,
+        *,
+        cluster_identifier: str,
+        aws_conn_id: str = "aws_default",
+        check_interval: float = 15,

Review comment:
       will remove :) (it was used on a previous implementation)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760412312



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       thanks so much @dstandish and @ashb, should we go back to using the strings and leave `cluster_not_found` to avoid the breaking change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760478968



##########
File path: airflow/providers/amazon/aws/hooks/redshift.py
##########
@@ -65,9 +81,9 @@ def cluster_status(self, cluster_identifier: str) -> str:
         """
         try:
             response = self.get_conn().describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters']
-            return response[0]['ClusterStatus'] if response else None
+            return RedshiftClusterStates(response[0]['ClusterStatus']) if response else None

Review comment:
       > coerce anything we haven't seen yet to an UNKOWN Enum state, or just continue to return None like other errors cases can do in this existing code
   
   Yup it's true this is an approach that would be more tolerant of the unexpected, it's just not what was done here.
   
   But I would say there is little value in converting in this return value.  If your value conforms to the enum, then your comparisons against the enum would work even without converting in the return.  And if it doesn't conform, then you either fail (current code state), or throw away information (replacing with None or UNKNOWN).  So what's the point of converting it to an enum at all?  I would just leave the raw values alone and use the enums for evaluation.
   
   > Though I'm happy to disagree and commit to using strings if that's what others think as well :)
   
   I don't know that we need to make a universal proclaimation but in this case I think not converting to enum makes sense :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r760504696



##########
File path: airflow/providers/amazon/aws/operators/redshift_resume_cluster.py
##########
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftClusterStates, RedshiftHook
+
+
+class RedshiftResumeClusterOperator(BaseOperator):

Review comment:
       For what is worth, i do think it's kind of crazy to have a "file-per-operator". If my vote counts for anything I would prefer to have the operators in `redshift.py`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r766071424



##########
File path: tests/providers/amazon/aws/sensors/test_redshift.py
##########
@@ -39,8 +39,6 @@ def _create_cluster():
             MasterUsername='admin',
             MasterUserPassword='mock_password',
         )
-        if not client.describe_clusters()['Clusters']:
-            raise ValueError('AWS not properly mocked')

Review comment:
       Thanks for catching - done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dbarrundiag commented on a change in pull request #19665: Add two operators in AWS Providers: RedshiftResumeClusterOperator and RedshiftPauseClusterOperator

Posted by GitBox <gi...@apache.org>.
dbarrundiag commented on a change in pull request #19665:
URL: https://github.com/apache/airflow/pull/19665#discussion_r765885209



##########
File path: tests/providers/amazon/aws/operators/test_redshift.py
##########
@@ -42,3 +47,113 @@ def test_redshift_operator(self, test_autocommit, test_parameters, mock_get_hook
             autocommit=test_autocommit,
             parameters=test_parameters,
         )
+
+
+class TestResumeClusterOperator:
+    @staticmethod
+    def _create_clusters():
+        client = boto3.client('redshift', region_name='us-east-1')
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_pause',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        client.create_cluster(
+            ClusterIdentifier='test_cluster_to_resume',
+            NodeType='dc1.large',
+            MasterUsername='admin',
+            MasterUserPassword='mock_password',
+        )
+        if not client.describe_clusters()['Clusters']:
+            raise ValueError('AWS not properly mocked')
+
+    def test_init(self):
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        assert redshift_operator.task_id == "task_test"
+        assert redshift_operator.cluster_identifier == "test_cluster"
+        assert redshift_operator.aws_conn_id == "aws_conn_test"
+
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status")
+    @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn")
+    def test_resume_cluster_is_called_when_cluster_is_paused(self, mock_get_conn, mock_cluster_status):
+        conn = MagicMock()
+        mock_run = conn.resume_cluster
+        mock_get_conn.return_value = conn
+
+        mock_cluster_status.return_value = 'paused'
+        redshift_operator = RedshiftResumeClusterOperator(
+            task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test"
+        )
+        redshift_operator.execute(None)
+        mock_run.assert_called_once_with(ClusterIdentifier='test_cluster')

Review comment:
       Done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org