You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/11/08 20:33:51 UTC
[GitHub] [airflow] Anton-Shutik commented on pull request #27559: Generic RDS Operators
Anton-Shutik commented on PR #27559:
URL: https://github.com/apache/airflow/pull/27559#issuecomment-1307793423
Yeah, your idea great as well.
Then we can go with more generic `Boto3Operator`:
```python
class Boto3Operator(BaseOperator):
def __init__(self, *args, aws_conn_id: str = "aws_conn_id", boto3_client: str = None, boto3_callable: str = None, boto3_params: dict | None = None, result_handler:callable | None = None, **kwargs):
super().__init__(*args, **kwargs)
self.hook.conn = AwsBaseHook(aws_conn_id=aws_conn_id, client_type=boto3_client)
assert hasattr(self.hook.conn, boto3_callable), f"{boto3_callable} does not exit on boto3.Client('{boto3_client}')"
self.boto3_handler = getattr(self.hook.conn, boto3_callable)
self.boto3_params = boto3_params
self.result_handler = result_handler or (lambda x: x)
def execute(self, context: Context) -> Any:
result = self.result_handler(
self.boto3_handler(self.boto3_params)
)
if not isinstance(result, str):
return json.dumps(result, default=str)
return result
```
Then we use it like:
```python
with models.DAG(
"example_rds",
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
find_latest_production_snapshot = Boto3Operator(
task_id='find_latest_production_snapshot',
aws_conn_id=AWS_CONNECTION_ID,
boto3_client="rds",
boto3_callable="describe_db_snapshots",
boto3_params={
"DBInstanceIdentifier": "production-database-name",
},
# filters out latest snapshot, returns its ARN
result_handler=lambda results: sorted(results["DBSnapshots"], key=lambda x: x["SnapshotCreateTime"], reverse=True)[0]["DBSnapshotArn"]
)
restoure_from_snapshot = Boto3Operator(
task_id='restoure_from_snapshot',
aws_conn_id=AWS_CONNECTION_ID,
boto3_client="rds",
boto3_callable="restore_db_instance_from_db_snapshot",
boto3_params={
"DBInstanceIdentifier": "rds_instance_identifier",
"DBSnapshotIdentifier": find_latest_production_snapshot.output,
...
},
result_handler=lambda x: None
)
```
Or if users like their own (in case if used frequently ) they do:
```python
class RdsRestoreDbInstanceFromSnapshotOperator(Boto3Operator):
"""
Restores snapshot into new RDS instance
"""
def __init__(self, *args, rds_client_kwargs:dict | None = None, **kwargs):
super().__init__(*args, boto3_client="rds", boto3_callable="restore_db_instance_from_db_snapshot", boto3_params=rds_client_kwargs, **kwargs)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org