You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/11/08 20:33:51 UTC

[GitHub] [airflow] Anton-Shutik commented on pull request #27559: Generic RDS Operators

Anton-Shutik commented on PR #27559:
URL: https://github.com/apache/airflow/pull/27559#issuecomment-1307793423

   Yeah, your idea great as well.
   Then we can go with more generic `Boto3Operator`:
   ```python
   
   class Boto3Operator(BaseOperator):
   
       def __init__(self, *args, aws_conn_id: str = "aws_conn_id", boto3_client: str = None, boto3_callable: str = None, boto3_params: dict | None = None, result_handler:callable | None = None, **kwargs):
           super().__init__(*args, **kwargs)
           self.hook.conn = AwsBaseHook(aws_conn_id=aws_conn_id, client_type=boto3_client)
   
           assert hasattr(self.hook.conn, boto3_callable), f"{boto3_callable} does not exit on boto3.Client('{boto3_client}')"
           self.boto3_handler = getattr(self.hook.conn, boto3_callable)
           self.boto3_params = boto3_params
           self.result_handler = result_handler or (lambda x: x)
   
       def execute(self, context: Context) -> Any:
           result = self.result_handler(
               self.boto3_handler(self.boto3_params)
           )
           if not isinstance(result, str):
               return json.dumps(result, default=str)
           return result
   ```
   
   Then we use it like:
   
   ```python
   with models.DAG(
       "example_rds",
       start_date=datetime(2021, 1, 1),
       catchup=False,
   ) as dag:
   
       find_latest_production_snapshot = Boto3Operator(
           task_id='find_latest_production_snapshot',
           aws_conn_id=AWS_CONNECTION_ID,
           boto3_client="rds",
           boto3_callable="describe_db_snapshots", 
           boto3_params={
               "DBInstanceIdentifier": "production-database-name",
           },
           # filters out latest snapshot, returns its ARN
           result_handler=lambda results: sorted(results["DBSnapshots"], key=lambda x: x["SnapshotCreateTime"], reverse=True)[0]["DBSnapshotArn"]
       )
   
       restoure_from_snapshot = Boto3Operator(
           task_id='restoure_from_snapshot',
           aws_conn_id=AWS_CONNECTION_ID,
           boto3_client="rds", 
           boto3_callable="restore_db_instance_from_db_snapshot", 
           boto3_params={
               "DBInstanceIdentifier": "rds_instance_identifier",
               "DBSnapshotIdentifier": find_latest_production_snapshot.output,
               ...
           },
           result_handler=lambda x: None
       )
   ```
   
   Or if users like their own (in case if used frequently ) they do:
   
   ```python
   class RdsRestoreDbInstanceFromSnapshotOperator(Boto3Operator):
       """
       Restores snapshot into new RDS instance
       """
   
       def __init__(self, *args, rds_client_kwargs:dict | None = None, **kwargs):
           super().__init__(*args, boto3_client="rds", boto3_callable="restore_db_instance_from_db_snapshot", boto3_params=rds_client_kwargs, **kwargs)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org