You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/03/19 12:56:12 UTC

[airflow] branch main updated: Update sample dag and doc for S3CreateBucketOperator, S3PutBucketTaggingOperator, S3GetBucketTaggingOperator, S3DeleteBucketTaggingOperator, S3DeleteBucketOperator (#22312)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5eb6335  Update sample dag and doc for S3CreateBucketOperator, S3PutBucketTaggingOperator, S3GetBucketTaggingOperator, S3DeleteBucketTaggingOperator, S3DeleteBucketOperator (#22312)
5eb6335 is described below

commit 5eb63357426598f99ed50b002b72aebdf8790f73
Author: vincbeck <97...@users.noreply.github.com>
AuthorDate: Sat Mar 19 08:55:11 2022 -0400

    Update sample dag and doc for S3CreateBucketOperator, S3PutBucketTaggingOperator, S3GetBucketTaggingOperator, S3DeleteBucketTaggingOperator, S3DeleteBucketOperator (#22312)
---
 .../amazon/aws/example_dags/example_s3_bucket.py   | 73 +++++++++++-------
 .../aws/example_dags/example_s3_bucket_tagging.py  | 62 ---------------
 .../operators/s3.rst                               | 90 +++++++++++++---------
 3 files changed, 98 insertions(+), 127 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
index 538538d..d04bbab 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
@@ -14,50 +14,65 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-# Ignore missing args provided by default_args
-# type: ignore[call-arg]
 
 import os
 from datetime import datetime
 
-from airflow.decorators import task
+from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
-from airflow.providers.amazon.aws.hooks.s3 import S3Hook
-from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
+from airflow.providers.amazon.aws.operators.s3 import (
+    S3CreateBucketOperator,
+    S3DeleteBucketOperator,
+    S3DeleteBucketTaggingOperator,
+    S3GetBucketTaggingOperator,
+    S3PutBucketTaggingOperator,
+)
 
 BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345')
-
-
-# [START howto_operator_s3_bucket]
-@task(task_id="s3_bucket_dag_add_keys_to_bucket")
-def upload_keys():
-    """This is a python callback to add keys into the s3 bucket"""
-    # add keys to bucket
-    s3_hook = S3Hook()
-    for i in range(0, 3):
-        s3_hook.load_string(
-            string_data="input",
-            key=f"path/data{i}",
-            bucket_name=BUCKET_NAME,
-        )
-
+TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key')
+TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value')
 
 with DAG(
-    dag_id='s3_bucket_dag',
+    dag_id='example_s3_bucket',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    default_args={"bucket_name": BUCKET_NAME},
-    max_active_runs=1,
     tags=['example'],
 ) as dag:
+    # [START howto_operator_s3_create_bucket]
+    create_bucket = S3CreateBucketOperator(
+        task_id='s3_create_bucket',
+        bucket_name=BUCKET_NAME,
+    )
+    # [END howto_operator_s3_create_bucket]
+
+    # [START howto_operator_s3_put_bucket_tagging]
+    put_tagging = S3PutBucketTaggingOperator(
+        task_id='s3_put_bucket_tagging',
+        bucket_name=BUCKET_NAME,
+        key=TAG_KEY,
+        value=TAG_VALUE,
+    )
+    # [END howto_operator_s3_put_bucket_tagging]
 
-    create_bucket = S3CreateBucketOperator(task_id='s3_bucket_dag_create', region_name='us-east-1')
+    # [START howto_operator_s3_get_bucket_tagging]
+    get_tagging = S3GetBucketTaggingOperator(
+        task_id='s3_get_bucket_tagging',
+        bucket_name=BUCKET_NAME,
+    )
+    # [END howto_operator_s3_get_bucket_tagging]
 
-    # Using a task-decorated function to add keys
-    add_keys_to_bucket = upload_keys()
+    # [START howto_operator_s3_delete_bucket_tagging]
+    delete_tagging = S3DeleteBucketTaggingOperator(
+        task_id='s3_delete_bucket_tagging',
+        bucket_name=BUCKET_NAME,
+    )
+    # [END howto_operator_s3_delete_bucket_tagging]
 
-    delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_dag_delete', force_delete=True)
+    # [START howto_operator_s3_delete_bucket]
+    delete_bucket = S3DeleteBucketOperator(
+        task_id='s3_delete_bucket', bucket_name=BUCKET_NAME, force_delete=True
+    )
+    # [END howto_operator_s3_delete_bucket]
 
-    create_bucket >> add_keys_to_bucket >> delete_bucket
-    # [END howto_operator_s3_bucket]
+    chain(create_bucket, put_tagging, get_tagging, delete_tagging, delete_bucket)
diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py b/airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py
deleted file mode 100644
index 7312dcc..0000000
--- a/airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Ignore missing args provided by default_args
-# type: ignore[call-arg]
-
-import os
-from datetime import datetime
-
-from airflow.models.dag import DAG
-from airflow.providers.amazon.aws.operators.s3 import (
-    S3CreateBucketOperator,
-    S3DeleteBucketOperator,
-    S3DeleteBucketTaggingOperator,
-    S3GetBucketTaggingOperator,
-    S3PutBucketTaggingOperator,
-)
-
-BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-s3-bucket-tagging')
-TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key')
-TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value')
-
-
-# [START howto_operator_s3_bucket_tagging]
-with DAG(
-    dag_id='s3_bucket_tagging_dag',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    default_args={"bucket_name": BUCKET_NAME},
-    max_active_runs=1,
-    tags=['example'],
-) as dag:
-
-    create_bucket = S3CreateBucketOperator(task_id='s3_bucket_tagging_dag_create', region_name='us-east-1')
-
-    delete_bucket = S3DeleteBucketOperator(task_id='s3_bucket_tagging_dag_delete', force_delete=True)
-
-    get_tagging = S3GetBucketTaggingOperator(task_id='s3_bucket_tagging_dag_get_tagging')
-
-    put_tagging = S3PutBucketTaggingOperator(
-        task_id='s3_bucket_tagging_dag_put_tagging', key=TAG_KEY, value=TAG_VALUE
-    )
-
-    delete_tagging = S3DeleteBucketTaggingOperator(task_id='s3_bucket_tagging_dag_delete_tagging')
-
-    create_bucket >> put_tagging >> get_tagging >> delete_tagging >> delete_bucket
-    # [END howto_operator_s3_bucket_tagging]
diff --git a/docs/apache-airflow-providers-amazon/operators/s3.rst b/docs/apache-airflow-providers-amazon/operators/s3.rst
index 29c58af..82868b6 100644
--- a/docs/apache-airflow-providers-amazon/operators/s3.rst
+++ b/docs/apache-airflow-providers-amazon/operators/s3.rst
@@ -19,14 +19,6 @@
 Amazon S3 Operators
 ====================
 
-Prerequisite Tasks
-------------------
-
-.. include:: _partials/prerequisite_tasks.rst
-
-Overview
---------
-
 Airflow to Amazon Simple Storage Service (S3) integration provides several operators to create and interact with S3 buckets.
 
  - :class:`~airflow.providers.amazon.aws.sensors.s3.S3KeySensor`
@@ -43,55 +35,81 @@ Airflow to Amazon Simple Storage Service (S3) integration provides several opera
  - :class:`~airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator`
  - :class:`~airflow.providers.amazon.aws.operators.s3.S3ListOperator`
 
-Two example_dags are provided which showcase these operators in action.
+Prerequisite Tasks
+------------------
 
- - example_s3_bucket.py
- - example_s3_bucket_tagging.py
+.. include:: _partials/prerequisite_tasks.rst
 
 .. _howto/operator:S3CreateBucketOperator:
+
+Create an Amazon S3 Bucket
+--------------------------
+
+To create an Amazon S3 bucket you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_s3_create_bucket]
+    :end-before: [END howto_operator_s3_create_bucket]
+
 .. _howto/operator:S3DeleteBucketOperator:
 
-Create and Delete Amazon S3 Buckets
------------------------------------
+Delete an Amazon S3 Bucket
+--------------------------
 
-Purpose
-"""""""
+To delete an Amazon S3 bucket you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketOperator`.
 
-This example dag ``example_s3_bucket.py`` uses ``S3CreateBucketOperator`` and ``S3DeleteBucketOperator`` to create a
-new S3 bucket with a given bucket name then delete it.
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_s3_delete_bucket]
+    :end-before: [END howto_operator_s3_delete_bucket]
 
-Defining tasks
-""""""""""""""
+.. _howto/operator:S3PutBucketTaggingOperator:
 
-In the following code we create a new bucket, add keys, and then delete the bucket.
+Set the tags for an Amazon S3 Bucket
+------------------------------------
+
+To set the tags for an Amazon S3 bucket you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator`.
 
 .. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
     :language: python
-    :start-after: [START howto_operator_s3_bucket]
-    :end-before: [END howto_operator_s3_bucket]
+    :dedent: 4
+    :start-after: [START howto_operator_s3_put_bucket_tagging]
+    :end-before: [END howto_operator_s3_put_bucket_tagging]
 
-.. _howto/operator:S3DeleteBucketTaggingOperator:
 .. _howto/operator:S3GetBucketTaggingOperator:
-.. _howto/operator:S3PutBucketTaggingOperator:
 
-Using Amazon S3 Bucket Tagging
-------------------------------
+Get the tag of an Amazon S3 Bucket
+----------------------------------
 
-Purpose
-"""""""
+To get the tag set associated with an Amazon S3 bucket you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_s3_get_bucket_tagging]
+    :end-before: [END howto_operator_s3_get_bucket_tagging]
 
-This example dag ``example_s3_bucket_tagging.py`` uses ``S3DeleteBucketTaggingOperator``, ``S3GetBucketTaggingOperator``,
-and ``S3PutBucketTaggingOperator`` to create a new S3 bucket, apply tagging, get tagging, delete tagging, then delete the bucket.
 
-Defining tasks
-""""""""""""""
+.. _howto/operator:S3DeleteBucketTaggingOperator:
+
+Delete the tags of an Amazon S3 Bucket
+--------------------------------------
 
-In the following code we create a new S3 bucket, apply tagging, get tagging, delete tagging, then delete the bucket.
+To delete the tags of an Amazon S3 bucket you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator`.
 
-.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket_tagging.py
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
     :language: python
-    :start-after: [START howto_operator_s3_bucket_tagging]
-    :end-before: [END howto_operator_s3_bucket_tagging]
+    :dedent: 4
+    :start-after: [START howto_operator_s3_delete_bucket_tagging]
+    :end-before: [END howto_operator_s3_delete_bucket_tagging]
 
 Reference
 ---------