You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/16 05:46:22 UTC

[GitHub] [airflow] hankehly opened a new pull request, #27076: Add RdsStopDbOperator and RdsStartDbOperator

hankehly opened a new pull request, #27076:
URL: https://github.com/apache/airflow/pull/27076

   Closes: #25952
   Related: #26003
   
   ### Summary
   
   This PR adds the following two operators to the amazon provider package.
   1. RdsStartDbOperator - Starts an RDS instance or cluster
   1. RdsStartDbOperator - Stops an RDS instance or cluster
   
   ### Todo
   
   - [x] Add `RdsStopDbOperator`
   - [x] Add `RdsStartDbOperator`
   - [ ] Update docs
   - [x] Add unit tests
   - [x] Add system tests
   - [ ] Run system tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996395638


##########
docs/apache-airflow-providers-amazon/operators/rds.rst:
##########
@@ -166,6 +166,32 @@ To delete a AWS DB instance you can use
     :start-after: [START howto_operator_rds_delete_db_instance]
     :end-before: [END howto_operator_rds_delete_db_instance]
 
+
+Start a database instance or cluster
+====================================
+
+To start an AWS DB instance or cluster you can use
+:class:`~airflow.providers.amazon.aws.operators.rds.RdsStartDbOperator`.
+
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_rds_instance.py
+    :language: python
+    :dedent: 4

Review Comment:
   ![Screen Shot 2022-10-16 at 15 18 34](https://user-images.githubusercontent.com/11639738/196021251-66f26876-95d5-406d-bcc8-92b68c562e04.png)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r998640521


##########
docs/apache-airflow-providers-amazon/operators/rds.rst:
##########
@@ -166,6 +166,35 @@ To delete a AWS DB instance you can use
     :start-after: [START howto_operator_rds_delete_db_instance]
     :end-before: [END howto_operator_rds_delete_db_instance]
 
+.. _howto/operator:RdsStartDbOperator:
+
+Start a database instance or cluster
+====================================
+
+To start an AWS DB instance or cluster you can use

Review Comment:
   Actually to make it more consistent with the rest of the doc, I would go with
   
   ```suggestion
   To start an Amazon RDS DB instance or cluster you can use
   ```



##########
docs/apache-airflow-providers-amazon/operators/rds.rst:
##########
@@ -166,6 +166,35 @@ To delete a AWS DB instance you can use
     :start-after: [START howto_operator_rds_delete_db_instance]
     :end-before: [END howto_operator_rds_delete_db_instance]
 
+.. _howto/operator:RdsStartDbOperator:
+
+Start a database instance or cluster
+====================================
+
+To start an AWS DB instance or cluster you can use
+:class:`~airflow.providers.amazon.aws.operators.rds.RdsStartDbOperator`.
+
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_rds_instance.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_rds_start_db]
+    :end-before: [END howto_operator_rds_start_db]
+
+
+.. _howto/operator:RdsStopDbOperator:
+
+Stop a database instance or cluster
+===================================
+
+To stop an AWS DB instance or cluster you can use

Review Comment:
   ```suggestion
   To stop an Amazon RDS DB instance or cluster you can use
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal merged pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
eladkal merged PR #27076:
URL: https://github.com/apache/airflow/pull/27076


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997301565


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   I agree, we should use waiters as much as possible. They come with nice features such as retry with exponential backoff, ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on PR #27076:
URL: https://github.com/apache/airflow/pull/27076#issuecomment-1281715876

   @o-nikolas @ferruzzi @eladkal @kazanzhy
   Please see/approve the latest feedback changes at your earliest convenience.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996392853


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -59,6 +58,12 @@ def _describe_item(self, item_type: str, item_name: str) -> list:
         elif item_type == 'event_subscription':
             subscriptions = self.hook.conn.describe_event_subscriptions(SubscriptionName=item_name)
             return subscriptions['EventSubscriptionsList']
+        elif item_type == "db_instance":

Review Comment:
   These are the same changes as those made to `RdsBaseSensor` in #26003.
   Related: https://github.com/apache/airflow/issues/25952#issuecomment-1237585545



##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -59,6 +58,12 @@ def _describe_item(self, item_type: str, item_name: str) -> list:
         elif item_type == 'event_subscription':
             subscriptions = self.hook.conn.describe_event_subscriptions(SubscriptionName=item_name)
             return subscriptions['EventSubscriptionsList']
+        elif item_type == "db_instance":

Review Comment:
   These are the same changes as those made to `RdsBaseSensor` in #26003.
   Related: https://github.com/apache/airflow/issues/25952#issuecomment-1237585545



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
vincbeck commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997327876


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   > Why? We need to wait for Available status only aren't we?
   
   Yes but implementation of `_await_status` requires `wait_statuses` as well. Otherwise, you'll get an exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996395916


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,135 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):
+        self.log.info(f"Waiting for DB {self.db_type} to reach 'available' state")
+        if self.db_type == RdsDbType.INSTANCE:
+            self.hook.conn.get_waiter("db_instance_available").wait(DBInstanceIdentifier=self.db_identifier)
+        else:
+            self.hook.conn.get_waiter("db_cluster_available").wait(DBClusterIdentifier=self.db_identifier)
+
+
+class RdsStopDbOperator(RdsBaseOperator):

Review Comment:
   Stop an RDS instance/cluster. Optionally create a snapshot.



##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,135 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):

Review Comment:
   Start an RDS instance/cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996392853


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -59,6 +58,12 @@ def _describe_item(self, item_type: str, item_name: str) -> list:
         elif item_type == 'event_subscription':
             subscriptions = self.hook.conn.describe_event_subscriptions(SubscriptionName=item_name)
             return subscriptions['EventSubscriptionsList']
+        elif item_type == "db_instance":

Review Comment:
   Changes to this class are the same as those made to `RdsBaseSensor` in #26003



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996536928


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   @kazanzhy 
   Thanks for your review. I agree, that would be more consistent. What concerns me is that when using `_await_status` we need to specify all possible `wait_statuses`. I observed the following states when creating an RDS instance, but [depending on the settings, there may be more.](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/accessing-monitoring.html) (And they might change in the future?)
   - Creating
   - Backing-up
   - Configuring-enhanced-monitoring
   
   Rather than listing every possible state or risk "missing one," I think using the builtin "waiter" implementation is more appropriate here. We only need to check for the "available" state.
   
   @o-nikolas
   I noticed you work at Amazon, is there anything you'd like to add to this discussion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996488754


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   Maybe we could use here already written `_await_status` method.
   It will reduce the code and unify all operators within this file.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997445302


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   @eladkal I remember that implemented `_await_status` to use the single method for different operators. I was the best solution in my opinion at that time. I knew about waiters but I guess there weren't all waiters that I needed.
   
   I agree with @ferruzzi , let's use `_await_status` in this PR and then refactor the module and add waiters where it's possible. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996395490


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,135 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)

Review Comment:
   `db_type` is a templated field. Casting to `RdsDbType` in the constructor will raise an exception when users pass template strings, so doing it here instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997343335


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   Yupp, will do :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997647005


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,135 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)

Review Comment:
   Other RDS settings are templated, so including `db_type` seemed natural. A user could generate a list of RDS instances/clusters to switch on/off at particular times of the day. Templating could be helpful in that case. Given it's a single-line change, the tradeoff between added flexibility and increased complexity seems appropriate to me here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997307588


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   I'm confused on why the _await_status is on operator level rather than on Hook level.
   
   > What concerns me is that when using _await_status we need to specify all possible wait_statuses
   
   Why? We need to wait for `Available` status only aren't we?



##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   I'm confused on why the `_await_status` is on operator level rather than on Hook level.
   
   > What concerns me is that when using _await_status we need to specify all possible wait_statuses
   
   Why? We need to wait for `Available` status only aren't we?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r998619999


##########
docs/apache-airflow-providers-amazon/operators/rds.rst:
##########
@@ -166,6 +166,35 @@ To delete a AWS DB instance you can use
     :start-after: [START howto_operator_rds_delete_db_instance]
     :end-before: [END howto_operator_rds_delete_db_instance]
 
+.. _howto/operator:RdsStartDbOperator:
+
+Start a database instance or cluster
+====================================
+
+To start an AWS DB instance or cluster you can use

Review Comment:
   ```suggestion
   To start an RDS DB instance or cluster you can use
   ```



##########
docs/apache-airflow-providers-amazon/operators/rds.rst:
##########
@@ -166,6 +166,35 @@ To delete a AWS DB instance you can use
     :start-after: [START howto_operator_rds_delete_db_instance]
     :end-before: [END howto_operator_rds_delete_db_instance]
 
+.. _howto/operator:RdsStartDbOperator:
+
+Start a database instance or cluster
+====================================
+
+To start an AWS DB instance or cluster you can use
+:class:`~airflow.providers.amazon.aws.operators.rds.RdsStartDbOperator`.
+
+.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_rds_instance.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_rds_start_db]
+    :end-before: [END howto_operator_rds_start_db]
+
+
+.. _howto/operator:RdsStopDbOperator:
+
+Stop a database instance or cluster
+===================================
+
+To stop an AWS DB instance or cluster you can use

Review Comment:
   ```suggestion
   To stop an RDS DB instance or cluster you can use
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997304815


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   Our preference for AWS code is to use the boto Waiters wherever possible. I don't have the context to know whether the existing `_await_status` method could be updated to use waiters or not, but that would be my preference rather than the other direction. But I also think that's out of scope for this PR. So how you've got it now is fine, IMHO.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997335480


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   @o-nikolas can you please create a followup task and link to this discussion?
   The `_await_status` is private function. we can remove it / change it as we see fit - it's not a breaking change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996526630


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):
+        self.log.info(f"Waiting for DB {self.db_type} to reach 'available' state")
+        if self.db_type == RdsDbType.INSTANCE:
+            self.hook.conn.get_waiter("db_instance_available").wait(DBInstanceIdentifier=self.db_identifier)
+        else:
+            self.hook.conn.get_waiter("db_cluster_available").wait(DBClusterIdentifier=self.db_identifier)
+
+
+class RdsStopDbOperator(RdsBaseOperator):
+    """
+    Stops an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStopDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to stop
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param db_snapshot_identifier: The instance identifier of the DB Snapshot to create before
+        stopping the DB instance. The default value (None) skips snapshot creation. This
+        parameter is ignored when ``db_type`` is "cluster"
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to stop. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        db_snapshot_identifier: str = None,
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.db_snapshot_identifier = db_snapshot_identifier
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        stop_db_response = self._stop_db()
+        if self.wait_for_completion:
+            self._wait_until_db_stopped()
+        return json.dumps(stop_db_response, default=str)
+
+    def _stop_db(self):
+        self.log.info(f"Stopping DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            conn_params = {"DBInstanceIdentifier": self.db_identifier}
+            # The db snapshot parameter is optional, but the AWS SDK raises an exception
+            # if passed a null value. Only set snapshot id if value is present.
+            if self.db_snapshot_identifier:
+                conn_params["DBSnapshotIdentifier"] = self.db_snapshot_identifier
+            response = self.hook.conn.stop_db_instance(**conn_params)
+        else:
+            if self.db_snapshot_identifier:
+                self.log.warning(
+                    "'db_snapshot_identifier' does not apply to db clusters. Remove it to silence this warning."
+                )
+            response = self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_stopped(self):

Review Comment:
   It's used around lines 802-808. (Could there be a misunderstanding?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996392853


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -59,6 +58,12 @@ def _describe_item(self, item_type: str, item_name: str) -> list:
         elif item_type == 'event_subscription':
             subscriptions = self.hook.conn.describe_event_subscriptions(SubscriptionName=item_name)
             return subscriptions['EventSubscriptionsList']
+        elif item_type == "db_instance":

Review Comment:
   These are the same changes as those made to `RdsBaseSensor` in #26003.
   Related: https://github.com/apache/airflow/issues/25952#issuecomment-1237585545



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996454757


##########
tests/system/providers/amazon/aws/example_rds_instance.py:
##########
@@ -71,6 +73,22 @@
     )
     # [END howto_sensor_rds_instance]
 
+    # [START howto_operator_rds_stop_db]
+    stop_db_instance = RdsStopDbOperator(
+        task_id="stop_db_instance",
+        db_identifier=rds_db_identifier,
+        wait_for_completion=True,

Review Comment:
   `wait_for_completion=True` is a default value, you can drop it from this call (similar to how RdsCreateDbInstanceOperator` does above. 



##########
tests/providers/amazon/aws/operators/test_rds.py:
##########
@@ -767,3 +769,127 @@ def test_delete_event_subscription_no_wait(self, mock_await_status):
         with pytest.raises(self.hook.conn.exceptions.ClientError):
             self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
         assert mock_await_status.not_called()
+
+
+@pytest.mark.skipif(mock_rds is None, reason="mock_rds package not present")
+class TestRdsStopDbOperator:
+    @classmethod
+    def setup_class(cls):
+        cls.dag = DAG("test_dag", default_args={"owner": "airflow", "start_date": DEFAULT_DATE})
+        cls.hook = RdsHook(aws_conn_id=AWS_CONN, region_name="us-east-1")
+        _patch_hook_get_connection(cls.hook)
+
+    @classmethod
+    def teardown_class(cls):
+        del cls.dag
+        del cls.hook
+
+    @mock_rds
+    @patch.object(RdsBaseOperator, "_await_status")
+    def test_stop_db_instance(self, mock_await_status):
+        _create_db_instance(self.hook)
+        stop_db_instance = RdsStopDbOperator(task_id="test_stop_db_instance", db_identifier=DB_INSTANCE_NAME)
+        _patch_hook_get_connection(stop_db_instance.hook)
+        stop_db_instance.execute(None)
+        result = self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
+        status = result["DBInstances"][0]["DBInstanceStatus"]
+        assert status == "stopped"
+        mock_await_status.assert_called()
+
+    @mock_rds
+    @patch.object(RdsBaseOperator, "_await_status")
+    def test_stop_db_instance_no_wait(self, mock_await_status):
+        _create_db_instance(self.hook)
+        stop_db_instance = RdsStopDbOperator(
+            task_id="test_stop_db_instance_no_wait", db_identifier=DB_INSTANCE_NAME, wait_for_completion=False
+        )
+        _patch_hook_get_connection(stop_db_instance.hook)
+        stop_db_instance.execute(None)
+        result = self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
+        status = result["DBInstances"][0]["DBInstanceStatus"]
+        assert status == "stopped"
+        mock_await_status.assert_not_called()
+
+    @mock_rds
+    def test_stop_db_instance_create_snapshot(self):
+        _create_db_instance(self.hook)
+        stop_db_instance = RdsStopDbOperator(
+            task_id="test_stop_db_instance_create_snapshot",
+            db_identifier=DB_INSTANCE_NAME,
+            db_snapshot_identifier=DB_INSTANCE_SNAPSHOT,
+        )
+        _patch_hook_get_connection(stop_db_instance.hook)
+        stop_db_instance.execute(None)
+
+        describe_result = self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
+        status = describe_result["DBInstances"][0]['DBInstanceStatus']
+        assert status == "stopped"
+
+        snapshot_result = self.hook.conn.describe_db_snapshots(DBSnapshotIdentifier=DB_INSTANCE_SNAPSHOT)
+        instance_snapshots = snapshot_result.get("DBSnapshots")
+        assert instance_snapshots
+        assert len(instance_snapshots) == 1
+

Review Comment:
   It might be worth adding a test for stopping the DB cluster with a snapshot and use the `caplog` fixture (there are examples in other airflow tests) to ensure your warning message is logged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
ferruzzi commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997434636


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   Just dogpiling on the "we should be using waiters everywhere" train.  There are a few other places (The EKS operators I wrote before I knew about the boto waiters, for example) in the package where we have custom wait methods but the official boto ones would be cleaner and easier alternatives.  It might make a nice task/project for new contributors to go through and see where they can be swapped out?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997445302


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   @eladkal I remember that I implemented `_await_status` to use the single method for different operators. I was the best solution in my opinion at that time. I knew about waiters but I guess there weren't all waiters that I needed.
   
   I agree with @ferruzzi, let's use `_await_status` in this PR and then refactor the module and add waiters where it's possible. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997643222


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   Thank you all for the feedback. Consensus:
   * Leave current changes as-is
   * Refactor waiting logic in [#27096](https://github.com/apache/airflow/issues/27096)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996488836


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):
+        self.log.info(f"Waiting for DB {self.db_type} to reach 'available' state")
+        if self.db_type == RdsDbType.INSTANCE:
+            self.hook.conn.get_waiter("db_instance_available").wait(DBInstanceIdentifier=self.db_identifier)
+        else:
+            self.hook.conn.get_waiter("db_cluster_available").wait(DBClusterIdentifier=self.db_identifier)
+
+
+class RdsStopDbOperator(RdsBaseOperator):
+    """
+    Stops an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStopDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to stop
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param db_snapshot_identifier: The instance identifier of the DB Snapshot to create before
+        stopping the DB instance. The default value (None) skips snapshot creation. This
+        parameter is ignored when ``db_type`` is "cluster"
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to stop. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        db_snapshot_identifier: str = None,
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.db_snapshot_identifier = db_snapshot_identifier
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        stop_db_response = self._stop_db()
+        if self.wait_for_completion:
+            self._wait_until_db_stopped()
+        return json.dumps(stop_db_response, default=str)
+
+    def _stop_db(self):
+        self.log.info(f"Stopping DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            conn_params = {"DBInstanceIdentifier": self.db_identifier}
+            # The db snapshot parameter is optional, but the AWS SDK raises an exception
+            # if passed a null value. Only set snapshot id if value is present.
+            if self.db_snapshot_identifier:
+                conn_params["DBSnapshotIdentifier"] = self.db_snapshot_identifier
+            response = self.hook.conn.stop_db_instance(**conn_params)
+        else:
+            if self.db_snapshot_identifier:
+                self.log.warning(
+                    "'db_snapshot_identifier' does not apply to db clusters. Remove it to silence this warning."
+                )
+            response = self.hook.conn.stop_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_stopped(self):

Review Comment:
   The same suggestion. Let's use `_await_status` method here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kazanzhy commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
kazanzhy commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996487826


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,135 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)

Review Comment:
   Can you describe the reason for making `db_type` a templated field?
   It could be either `instance` or `cluster` so do we need to make it more complex?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on PR #27076:
URL: https://github.com/apache/airflow/pull/27076#issuecomment-1279922083

   @potiuk @ferruzzi @kazanzhy @o-nikolas @vincbeck 
   Please review this PR at your earliest convenience.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997640135


##########
tests/providers/amazon/aws/operators/test_rds.py:
##########
@@ -767,3 +769,127 @@ def test_delete_event_subscription_no_wait(self, mock_await_status):
         with pytest.raises(self.hook.conn.exceptions.ClientError):
             self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
         assert mock_await_status.not_called()
+
+
+@pytest.mark.skipif(mock_rds is None, reason="mock_rds package not present")
+class TestRdsStopDbOperator:
+    @classmethod
+    def setup_class(cls):
+        cls.dag = DAG("test_dag", default_args={"owner": "airflow", "start_date": DEFAULT_DATE})
+        cls.hook = RdsHook(aws_conn_id=AWS_CONN, region_name="us-east-1")
+        _patch_hook_get_connection(cls.hook)
+
+    @classmethod
+    def teardown_class(cls):
+        del cls.dag
+        del cls.hook
+
+    @mock_rds
+    @patch.object(RdsBaseOperator, "_await_status")
+    def test_stop_db_instance(self, mock_await_status):
+        _create_db_instance(self.hook)
+        stop_db_instance = RdsStopDbOperator(task_id="test_stop_db_instance", db_identifier=DB_INSTANCE_NAME)
+        _patch_hook_get_connection(stop_db_instance.hook)
+        stop_db_instance.execute(None)
+        result = self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
+        status = result["DBInstances"][0]["DBInstanceStatus"]
+        assert status == "stopped"
+        mock_await_status.assert_called()
+
+    @mock_rds
+    @patch.object(RdsBaseOperator, "_await_status")
+    def test_stop_db_instance_no_wait(self, mock_await_status):
+        _create_db_instance(self.hook)
+        stop_db_instance = RdsStopDbOperator(
+            task_id="test_stop_db_instance_no_wait", db_identifier=DB_INSTANCE_NAME, wait_for_completion=False
+        )
+        _patch_hook_get_connection(stop_db_instance.hook)
+        stop_db_instance.execute(None)
+        result = self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
+        status = result["DBInstances"][0]["DBInstanceStatus"]
+        assert status == "stopped"
+        mock_await_status.assert_not_called()
+
+    @mock_rds
+    def test_stop_db_instance_create_snapshot(self):
+        _create_db_instance(self.hook)
+        stop_db_instance = RdsStopDbOperator(
+            task_id="test_stop_db_instance_create_snapshot",
+            db_identifier=DB_INSTANCE_NAME,
+            db_snapshot_identifier=DB_INSTANCE_SNAPSHOT,
+        )
+        _patch_hook_get_connection(stop_db_instance.hook)
+        stop_db_instance.execute(None)
+
+        describe_result = self.hook.conn.describe_db_instances(DBInstanceIdentifier=DB_INSTANCE_NAME)
+        status = describe_result["DBInstances"][0]['DBInstanceStatus']
+        assert status == "stopped"
+
+        snapshot_result = self.hook.conn.describe_db_snapshots(DBSnapshotIdentifier=DB_INSTANCE_SNAPSHOT)
+        instance_snapshots = snapshot_result.get("DBSnapshots")
+        assert instance_snapshots
+        assert len(instance_snapshots) == 1
+

Review Comment:
   Nice idea. Please see `test_stop_db_cluster_create_snapshot_logs_warning_message` in the same file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hankehly commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
hankehly commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r996395490


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,135 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)

Review Comment:
   `db_type` is a templated field. Casting to `RdsDbType` in the constructor will raise an exception when users pass template strings, so I'm doing it here instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997325535


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   > I'm confused on why the _await_status is on operator level rather than on Hook level.
   
   Agreed, I think we should ultimately deprecate this thing in favour of boto waiters (and some shared hook method if there are some situations where waiters don't suffice). But I still think that work is out of scope for this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #27076: Add RdsStopDbOperator and RdsStartDbOperator

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #27076:
URL: https://github.com/apache/airflow/pull/27076#discussion_r997348259


##########
airflow/providers/amazon/aws/operators/rds.py:
##########
@@ -672,6 +682,132 @@ def execute(self, context: Context) -> str:
         return json.dumps(delete_db_instance, default=str)
 
 
+class RdsStartDbOperator(RdsBaseOperator):
+    """
+    Starts an RDS DB instance / cluster
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:RdsStartDbOperator`
+
+    :param db_identifier: The AWS identifier of the DB to start
+    :param db_type: Type of the DB - either "instance" or "cluster" (default: "instance")
+    :param aws_conn_id: The Airflow connection used for AWS credentials. (default: "aws_default")
+    :param wait_for_completion:  If True, waits for DB to start. (default: True)
+    """
+
+    template_fields = ("db_identifier", "db_type")
+
+    def __init__(
+        self,
+        *,
+        db_identifier: str,
+        db_type: str = "instance",
+        aws_conn_id: str = "aws_default",
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(aws_conn_id=aws_conn_id, **kwargs)
+        self.db_identifier = db_identifier
+        self.db_type = db_type
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: Context) -> str:
+        self.db_type = RdsDbType(self.db_type)
+        start_db_response = self._start_db()
+        if self.wait_for_completion:
+            self._wait_until_db_available()
+        return json.dumps(start_db_response, default=str)
+
+    def _start_db(self):
+        self.log.info(f"Starting DB {self.db_type} '{self.db_identifier}'")
+        if self.db_type == RdsDbType.INSTANCE:
+            response = self.hook.conn.start_db_instance(DBInstanceIdentifier=self.db_identifier)
+        else:
+            response = self.hook.conn.start_db_cluster(DBClusterIdentifier=self.db_identifier)
+        return response
+
+    def _wait_until_db_available(self):

Review Comment:
   #27096



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org