You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/03/19 12:56:12 UTC
[airflow] branch main updated: Update sample dag and doc for S3CreateBucketOperator, S3PutBucketTaggingOperator, S3GetBucketTaggingOperator, S3DeleteBucketTaggingOperator, S3DeleteBucketOperator (#22312)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5eb6335 Update sample dag and doc for S3CreateBucketOperator, S3PutBucketTaggingOperator, S3GetBucketTaggingOperator, S3DeleteBucketTaggingOperator, S3DeleteBucketOperator (#22312)
5eb6335 is described below
commit 5eb63357426598f99ed50b002b72aebdf8790f73
Author: vincbeck <97...@users.noreply.github.com>
AuthorDate: Sat Mar 19 08:55:11 2022 -0400
Update sample dag and doc for S3CreateBucketOperator, S3PutBucketTaggingOperator, S3GetBucketTaggingOperator, S3DeleteBucketTaggingOperator, S3DeleteBucketOperator (#22312)
---
.../amazon/aws/example_dags/example_s3_bucket.py | 73 +++++++++++-------
.../aws/example_dags/example_s3_bucket_tagging.py | 62 ---------------
.../operators/s3.rst | 90 +++++++++++++---------
3 files changed, 98 insertions(+), 127 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
index 538538d..d04bbab 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
@@ -14,50 +14,65 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-# Ignore missing args provided by default_args
-# type: ignore[call-arg]
import os
from datetime import datetime
-from airflow.decorators import task
+from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
-from airflow.providers.amazon.aws.hooks.s3 import S3Hook
-from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
+from airflow.providers.amazon.aws.operators.s3 import (
+ S3CreateBucketOperator,
+ S3DeleteBucketOperator,
+ S3DeleteBucketTaggingOperator,
+ S3GetBucketTaggingOperator,
+ S3PutBucketTaggingOperator,
+)
BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345')
-
-
-# [START howto_operator_s3_bucket]
-@task(task_id="s3_bucket_dag_add_keys_to_bucket")
-def upload_keys():
- """This is a python callback to add keys into the s3 bucket"""
- # add keys to bucket
- s3_hook = S3Hook()
- for i in range(0, 3):
- s3_hook.load_string(
- string_data="input",
- key=f"path/data{i}",
- bucket_name=BUCKET_NAME,
- )
-
+TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key')
+TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value')
with DAG(
- dag_id='s3_bucket_dag',
+ dag_id='example_s3_bucket',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
- default_args={"bucket_name": BUCKET_NAME},
- max_active_runs=1,
tags=['example'],
) as dag:
+ # [START howto_operator_s3_create_bucket]
+ create_bucket = S3CreateBucketOperator(
+ task_id='s3_create_bucket',
+ bucket_name=BUCKET_NAME,
+ )
+ # [END howto_operator_s3_create_bucket]
+
+ # [START howto_operator_s3_put_bucket_tagging]
+ put_tagging = S3PutBucketTaggingOperator(
+ task_id='s3_put_bucket_tagging',
+ bucket_name=BUCKET_NAME,
+ key=TAG_KEY,
+ value=TAG_VALUE,
+ )
+ # [END howto_operator_s3_put_bucket_tagging]
- create_bucket = S3CreateBucketOperator(task_id='s3_bucket_dag_create', region_name='us-east-1')
+ # [START howto_operator_s3_get_bucket_tagging]
+ get_tagging = S3GetBucketTaggingOperator(
+ task_id='s3_get_bucket_tagging',
+ bucket_name=BUCKET_NAME,
+ )
+ # [END howto_operator_s3_get_bucket_tagging]
- # Using a task-decorated function to add keys
- add_keys_to_bucket = upload_keys()
+ # [START howto_operator_s3_delete_bucket_tagging]
+ delete_tagging = S3DeleteBucketTaggingOperator(
+ task_id='s3_delete_bucket_tagging',
+ bucket_name=BUCKET_NAME,
+ )
+ # [END howto_operator_s3_delete_bucket_tagging]
- delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_dag_delete', force_delete=True)
+ # [START howto_operator_s3_delete_bucket]
+ delete_bucket = S3DeleteBucketOperator(
+ task_id='s3_delete_bucket', bucket_name=BUCKET_NAME, force_delete=True
+ )
+ # [END howto_operator_s3_delete_bucket]
- create_bucket >> add_keys_to_bucket >> delete_bucket
- # [END howto_operator_s3_bucket]
+ chain(create_bucket, put_tagging, get_tagging, delete_tagging, delete_bucket)
diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py b/airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py
deleted file mode 100644
index 7312dcc..0000000
--- a/airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Ignore missing args provided by default_args
-# type: ignore[call-arg]
-
-import os
-from datetime import datetime
-
-from airflow.models.dag import DAG
-from airflow.providers.amazon.aws.operators.s3 import (
- S3CreateBucketOperator,
- S3DeleteBucketOperator,
- S3DeleteBucketTaggingOperator,
- S3GetBucketTaggingOperator,
- S3PutBucketTaggingOperator,
-)
-
-BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-s3-bucket-tagging')
-TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key')
-TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value')
-
-
-# [START howto_operator_s3_bucket_tagging]
-with DAG(
- dag_id='s3_bucket_tagging_dag',
- schedule_interval=None,
- start_date=datetime(2021, 1, 1),
- catchup=False,
- default_args={"bucket_name": BUCKET_NAME},
- max_active_runs=1,
- tags=['example'],
-) as dag:
-
- create_bucket = S3CreateBucketOperator(task_id='s3_bucket_tagging_dag_create', region_name='us-east-1')
-
- delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_tagging_dag_delete', force_delete=True)
-
- get_tagging = S3GetBucketTaggingOperator(task_id='s3_bucket_tagging_dag_get_tagging')
-
- put_tagging = S3PutBucketTaggingOperator(
- task_id='s3_bucket_tagging_dag_put_tagging', key=TAG_KEY, value=TAG_VALUE
- )
-
- delete_tagging = S3DeleteBucketTaggingOperator(task_id='s3_bucket_tagging_dag_delete_tagging')
-
- create_bucket >> put_tagging >> get_tagging >> delete_tagging >> delete_bucket
- # [END howto_operator_s3_bucket_tagging]
diff --git a/docs/apache-airflow-providers-amazon/operators/s3.rst b/docs/apache-airflow-providers-amazon/operators/s3.rst
index 29c58af..82868b6 100644
--- a/docs/apache-airflow-providers-amazon/operators/s3.rst
+++ b/docs/apache-airflow-providers-amazon/operators/s3.rst
@@ -19,14 +19,6 @@
Amazon S3 Operators
====================
-Prerequisite Tasks
-------------------
-
-.. include:: _partials/prerequisite_tasks.rst
-
-Overview
---------
-
Airflow to Amazon Simple Storage Service (S3) integration provides several operators to create and interact with S3 buckets.
- :class:`~airflow.providers.amazon.aws.sensors.s3.S3KeySensor`
@@ -43,55 +35,81 @@ Airflow to Amazon Simple Storage Service (S3) integration provides several opera
- :class:`~airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator`
- :class:`~airflow.providers.amazon.aws.operators.s3.S3ListOperator`
-Two example_dags are provided which showcase these operators in action.
+Prerequisite Tasks
+------------------
- - example_s3_bucket.py
- - example_s3_bucket_tagging.py
+.. include:: _partials/prerequisite_tasks.rst
.. _howto/operator:S3CreateBucketOperator:
+
+Create an Amazon S3 Bucket
+--------------------------
+
+To create an Amazon S3 bucket you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_s3_create_bucket]
+ :end-before: [END howto_operator_s3_create_bucket]
+
.. _howto/operator:S3DeleteBucketOperator:
-Create and Delete Amazon S3 Buckets
------------------------------------
+Delete an Amazon S3 Bucket
+--------------------------
-Purpose
-"""""""
+To delete an Amazon S3 bucket you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketOperator`.
-This example dag ``example_s3_bucket.py`` uses ``S3CreateBucketOperator`` and ``S3DeleteBucketOperator`` to create a
-new S3 bucket with a given bucket name then delete it.
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_s3_delete_bucket]
+ :end-before: [END howto_operator_s3_delete_bucket]
-Defining tasks
-""""""""""""""
+.. _howto/operator:S3PutBucketTaggingOperator:
-In the following code we create a new bucket, add keys, and then delete the bucket.
+Set the tags for an Amazon S3 Bucket
+------------------------------------
+
+To set the tags for an Amazon S3 bucket you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator`.
.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
:language: python
- :start-after: [START howto_operator_s3_bucket]
- :end-before: [END howto_operator_s3_bucket]
+ :dedent: 4
+ :start-after: [START howto_operator_s3_put_bucket_tagging]
+ :end-before: [END howto_operator_s3_put_bucket_tagging]
-.. _howto/operator:S3DeleteBucketTaggingOperator:
.. _howto/operator:S3GetBucketTaggingOperator:
-.. _howto/operator:S3PutBucketTaggingOperator:
-Using Amazon S3 Bucket Tagging
-------------------------------
+Get the tag of an Amazon S3 Bucket
+----------------------------------
-Purpose
-"""""""
+To get the tag set associated with an Amazon S3 bucket you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_s3_get_bucket_tagging]
+ :end-before: [END howto_operator_s3_get_bucket_tagging]
-This example dag ``example_s3_bucket_tagging.py`` uses ``S3DeleteBucketTaggingOperator``, ``S3GetBucketTaggingOperator``,
-and ``S3PutBucketTaggingOperator`` to create a new S3 bucket, apply tagging, get tagging, delete tagging, then delete the bucket.
-Defining tasks
-""""""""""""""
+.. _howto/operator:S3DeleteBucketTaggingOperator:
+
+Delete the tags of an Amazon S3 Bucket
+--------------------------------------
-In the following code we create a new S3 bucket, apply tagging, get tagging, delete tagging, then delete the bucket.
+To delete the tags of an Amazon S3 bucket you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator`.
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
:language: python
- :start-after: [START howto_operator_s3_bucket_tagging]
- :end-before: [END howto_operator_s3_bucket_tagging]
+ :dedent: 4
+ :start-after: [START howto_operator_s3_delete_bucket_tagging]
+ :end-before: [END howto_operator_s3_delete_bucket_tagging]
Reference
---------