You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/29 22:36:12 UTC

[airflow] branch master updated: Add Delete/Create S3 bucket operators (#8895)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 357e11e  Add Delete/Create S3 bucket operators (#8895)
357e11e is described below

commit 357e11e0cfb4c02833018e073bc4f5e5b52fae4f
Author: S S Rohit <ro...@gmail.com>
AuthorDate: Sat May 30 04:05:40 2020 +0530

    Add Delete/Create S3 bucket operators (#8895)
---
 .../amazon/aws/example_dags/example_s3_bucket.py   | 65 +++++++++++++++
 airflow/providers/amazon/aws/hooks/base_aws.py     | 16 ++--
 airflow/providers/amazon/aws/hooks/s3.py           | 20 +++++
 .../providers/amazon/aws/operators/s3_bucket.py    | 95 ++++++++++++++++++++++
 docs/operators-and-hooks-ref.rst                   |  3 +-
 tests/providers/amazon/aws/hooks/test_s3.py        | 21 ++++-
 .../amazon/aws/operators/test_example_s3_bucket.py | 26 ++++++
 .../amazon/aws/operators/test_s3_bucket.py         | 84 +++++++++++++++++++
 8 files changed, 319 insertions(+), 11 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
new file mode 100644
index 0000000..0321cfa
--- /dev/null
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from airflow.models.dag import DAG
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator
+from airflow.utils.dates import days_ago
+
+BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345')
+
+
+def upload_keys():
+    """This is a python callback to add keys into the s3 bucket"""
+    # add keys to bucket
+    s3_hook = S3Hook()
+    for i in range(0, 3):
+        s3_hook.load_string(
+            string_data="input",
+            key=f"path/data{i}",
+            bucket_name=BUCKET_NAME,
+        )
+
+
+with DAG(
+    dag_id='s3_bucket_dag',
+    schedule_interval=None,
+    start_date=days_ago(2),
+    max_active_runs=1,
+    tags=['example'],
+) as dag:
+
+    create_bucket = S3CreateBucketOperator(
+        task_id='s3_bucket_dag_create',
+        bucket_name=BUCKET_NAME,
+        region_name='us-east-1',
+    )
+
+    add_keys_to_bucket = PythonOperator(
+        task_id="s3_bucket_dag_add_keys_to_bucket",
+        python_callable=upload_keys
+    )
+
+    delete_bucket = S3DeleteBucketOperator(
+        task_id='s3_bucket_dag_delete',
+        bucket_name=BUCKET_NAME,
+        force_delete=True,
+    )
+
+    create_bucket >> add_keys_to_bucket >> delete_bucket
diff --git a/airflow/providers/amazon/aws/hooks/base_aws.py b/airflow/providers/amazon/aws/hooks/base_aws.py
index 61b80fb..83bb17e 100644
--- a/airflow/providers/amazon/aws/hooks/base_aws.py
+++ b/airflow/providers/amazon/aws/hooks/base_aws.py
@@ -62,14 +62,14 @@ class AwsBaseHook(BaseHook):
     """
 
     def __init__(
-            self,
-            aws_conn_id: Optional[str] = "aws_default",
-            verify: Union[bool, str, None] = None,
-            region_name: Optional[str] = None,
-            client_type: Optional[str] = None,
-            resource_type: Optional[str] = None,
-            config: Optional[Config] = None
-    ):
+        self,
+        aws_conn_id: Optional[str] = "aws_default",
+        verify: Union[bool, str, None] = None,
+        region_name: Optional[str] = None,
+        client_type: Optional[str] = None,
+        resource_type: Optional[str] = None,
+        config: Optional[Config] = None
+    ) -> None:
         super().__init__()
         self.aws_conn_id = aws_conn_id
         self.verify = verify
diff --git a/airflow/providers/amazon/aws/hooks/s3.py b/airflow/providers/amazon/aws/hooks/s3.py
index 331a999..46d9043 100644
--- a/airflow/providers/amazon/aws/hooks/s3.py
+++ b/airflow/providers/amazon/aws/hooks/s3.py
@@ -664,6 +664,26 @@ class S3Hook(AwsBaseHook):
                                                ACL=acl_policy)
         return response
 
+    @provide_bucket_name
+    def delete_bucket(self, bucket_name: str, force_delete: bool = False) -> None:
+        """
+        To delete s3 bucket, delete all s3 bucket objects and then delete the bucket.
+
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        :param force_delete: Enable this to delete bucket even if not empty
+        :type force_delete: bool
+        :return: None
+        :rtype: None
+        """
+        if force_delete:
+            bucket_keys = self.list_keys(bucket_name=bucket_name)
+            if bucket_keys:
+                self.delete_objects(bucket=bucket_name, keys=bucket_keys)
+        self.conn.delete_bucket(
+            Bucket=bucket_name
+        )
+
     def delete_objects(self, bucket, keys):
         """
         Delete keys from the bucket.
diff --git a/airflow/providers/amazon/aws/operators/s3_bucket.py b/airflow/providers/amazon/aws/operators/s3_bucket.py
new file mode 100644
index 0000000..a740aba
--- /dev/null
+++ b/airflow/providers/amazon/aws/operators/s3_bucket.py
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module contains AWS S3 operators.
+"""
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+
+class S3CreateBucketOperator(BaseOperator):
+    """
+    This operator creates an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: str
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param region_name: AWS region_name. If not specified fetched from connection.
+    :type region_name: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 region_name: Optional[str] = None,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.region_name = region_name
+        self.aws_conn_id = aws_conn_id
+        self.region_name = region_name
+
+    def execute(self, context):
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)
+        if not s3_hook.check_for_bucket(self.bucket_name):
+            s3_hook.create_bucket(bucket_name=self.bucket_name, region_name=self.region_name)
+            self.log.info("Created bucket with name: %s", self.bucket_name)
+        else:
+            self.log.info("Bucket with name: %s already exists", self.bucket_name)
+
+
+class S3DeleteBucketOperator(BaseOperator):
+    """
+    This operator deletes an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: str
+    :param force_delete: Forcibly delete all objects in the bucket before deleting the bucket
+    :type force_delete: bool
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 force_delete: Optional[bool] = False,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.force_delete = force_delete
+        self.aws_conn_id = aws_conn_id
+
+    def execute(self, context):
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
+        if s3_hook.check_for_bucket(self.bucket_name):
+            s3_hook.delete_bucket(bucket_name=self.bucket_name, force_delete=self.force_delete)
+            self.log.info("Deleted bucket with name: %s", self.bucket_name)
+        else:
+            self.log.info("Bucket with name: %s doesn't exist", self.bucket_name)
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index c39f6e4..3eaf1a0 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -490,7 +490,8 @@ These integrations allow you to perform various operations within the Amazon Web
    * - `Amazon Simple Storage Service (S3) <https://aws.amazon.com/s3/>`__
      -
      - :mod:`airflow.providers.amazon.aws.hooks.s3`
-     - :mod:`airflow.providers.amazon.aws.operators.s3_file_transform`,
+     - :mod:`airflow.providers.amazon.aws.operators.s3_bucket`,
+       :mod:`airflow.providers.amazon.aws.operators.s3_file_transform`,
        :mod:`airflow.providers.amazon.aws.operators.s3_copy_object`,
        :mod:`airflow.providers.amazon.aws.operators.s3_delete_objects`,
        :mod:`airflow.providers.amazon.aws.operators.s3_list`
diff --git a/tests/providers/amazon/aws/hooks/test_s3.py b/tests/providers/amazon/aws/hooks/test_s3.py
index 20fa21e..59f823a 100644
--- a/tests/providers/amazon/aws/hooks/test_s3.py
+++ b/tests/providers/amazon/aws/hooks/test_s3.py
@@ -16,7 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
 import gzip as gz
 import os
 import tempfile
@@ -25,7 +24,7 @@ from unittest.mock import Mock
 import boto3
 import mock
 import pytest
-from botocore.exceptions import NoCredentialsError
+from botocore.exceptions import ClientError, NoCredentialsError
 
 from airflow.exceptions import AirflowException
 from airflow.models import Connection
@@ -313,6 +312,24 @@ class TestAwsS3Hook:
             assert ((response['Grants'][0]['Permission'] == 'FULL_CONTROL') and
                     (len(response['Grants']) == 1))
 
+    @mock_s3
+    def test_delete_bucket_if_bucket_exist(self, s3_bucket):
+        # assert if the bucket is created
+        mock_hook = S3Hook()
+        mock_hook.create_bucket(bucket_name=s3_bucket)
+        assert mock_hook.check_for_bucket(bucket_name=s3_bucket)
+        mock_hook.delete_bucket(bucket_name=s3_bucket, force_delete=True)
+        assert not mock_hook.check_for_bucket(s3_bucket)
+
+    @mock_s3
+    def test_delete_bucket_if_not_bucket_exist(self, s3_bucket):
+        # assert if exception is raised if bucket not present
+        mock_hook = S3Hook()
+        with pytest.raises(ClientError) as error:
+            # assert error
+            assert mock_hook.delete_bucket(bucket_name=s3_bucket, force_delete=True)
+        assert error.value.response['Error']['Code'] == 'NoSuchBucket'
+
     @mock.patch.object(S3Hook, 'get_connection', return_value=Connection(schema='test_bucket'))
     def test_provide_bucket_name(self, mock_get_connection):
 
diff --git a/tests/providers/amazon/aws/operators/test_example_s3_bucket.py b/tests/providers/amazon/aws/operators/test_example_s3_bucket.py
new file mode 100644
index 0000000..84be5f7
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_example_s3_bucket.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from tests.test_utils.amazon_system_helpers import AWS_DAG_FOLDER, AmazonSystemTest, provide_aws_context
+
+
+class S3BucketExampleDagsSystemTest(AmazonSystemTest):
+    """
+    System tests for AWS S3 operators
+    """
+    @provide_aws_context()
+    def test_run_example_dag_s3(self):
+        self.run_dag('s3_bucket_dag', AWS_DAG_FOLDER)
diff --git a/tests/providers/amazon/aws/operators/test_s3_bucket.py b/tests/providers/amazon/aws/operators/test_s3_bucket.py
new file mode 100644
index 0000000..5913271
--- /dev/null
+++ b/tests/providers/amazon/aws/operators/test_s3_bucket.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import unittest
+
+import mock
+from moto import mock_s3
+
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator
+
+BUCKET_NAME = os.environ.get("BUCKET_NAME", "test-airflow-bucket")
+TASK_ID = os.environ.get("TASK_ID", "test-s3-operator")
+
+
+class TestS3CreateBucketOperator(unittest.TestCase):
+    def setUp(self):
+        self.create_bucket_operator = S3CreateBucketOperator(
+            task_id=TASK_ID,
+            bucket_name=BUCKET_NAME,
+        )
+
+    @mock_s3
+    @mock.patch.object(S3Hook, "create_bucket")
+    @mock.patch.object(S3Hook, "check_for_bucket")
+    def test_execute_if_bucket_exist(self, mock_check_for_bucket, mock_create_bucket):
+        mock_check_for_bucket.return_value = True
+        # execute s3 bucket create operator
+        self.create_bucket_operator.execute({})
+        mock_check_for_bucket.assert_called_once_with(BUCKET_NAME)
+        mock_create_bucket.assert_not_called()
+
+    @mock_s3
+    @mock.patch.object(S3Hook, "create_bucket")
+    @mock.patch.object(S3Hook, "check_for_bucket")
+    def test_execute_if_not_bucket_exist(self, mock_check_for_bucket, mock_create_bucket):
+        mock_check_for_bucket.return_value = False
+        # execute s3 bucket create operator
+        self.create_bucket_operator.execute({})
+        mock_check_for_bucket.assert_called_once_with(BUCKET_NAME)
+        mock_create_bucket.assert_called_once_with(bucket_name=BUCKET_NAME, region_name=None)
+
+
+class TestS3DeleteBucketOperator(unittest.TestCase):
+    def setUp(self):
+        self.delete_bucket_operator = S3DeleteBucketOperator(
+            task_id=TASK_ID,
+            bucket_name=BUCKET_NAME,
+        )
+
+    @mock_s3
+    @mock.patch.object(S3Hook, "delete_bucket")
+    @mock.patch.object(S3Hook, "check_for_bucket")
+    def test_execute_if_bucket_exist(self, mock_check_for_bucket, mock_delete_bucket):
+        mock_check_for_bucket.return_value = True
+        # execute s3 bucket delete operator
+        self.delete_bucket_operator.execute({})
+        mock_check_for_bucket.assert_called_once_with(BUCKET_NAME)
+        mock_delete_bucket.assert_called_once_with(bucket_name=BUCKET_NAME, force_delete=False)
+
+    @mock_s3
+    @mock.patch.object(S3Hook, "delete_bucket")
+    @mock.patch.object(S3Hook, "check_for_bucket")
+    def test_execute_if_not_bucket_exist(self, mock_check_for_bucket, mock_delete_bucket):
+        mock_check_for_bucket.return_value = False
+        # execute s3 bucket delete operator
+        self.delete_bucket_operator.execute({})
+        mock_check_for_bucket.assert_called_once_with(BUCKET_NAME)
+        mock_delete_bucket.assert_not_called()