You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/20 07:16:17 UTC

[GitHub] [airflow] feluelle commented on a change in pull request #8895: Add Delete/Create S3 bucket operators

feluelle commented on a change in pull request #8895:
URL: https://github.com/apache/airflow/pull/8895#discussion_r427785346



##########
File path: airflow/providers/amazon/aws/operators/s3_bucket.py
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module contains AWS S3 operators.
+"""
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+
+class S3CreateBucketOperator(BaseOperator):
+    """
+    This operator creates an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: Optional[str]
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param region_name: AWS region_name. If not specified fetched from connection.
+    :type region_name: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 region_name: Optional[str] = None,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.region_name = region_name
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id, region_name=region_name)
+
+    def execute(self, context):
+        S3CreateBucketOperator.check_for_bucket(self.s3_hook, self.bucket_name)
+        self.s3_hook.create_bucket(bucket_name=self.bucket_name, region_name=self.region_name)
+        self.log.info("Created bucket with name: %s", self.bucket_name)
+
+    @staticmethod
+    def check_for_bucket(s3_hook: S3Hook, bucket_name: str) -> None:
+        """
+        Override this method if you don't want to raise excaption if bucket already exists.
+
+        :param s3_hook: Hook to interact with aws s3 services
+        :type s3_hook: S3Hook
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        :return: None
+        :rtype: None
+        """
+        if s3_hook.check_for_bucket(bucket_name):
+            raise AirflowException(f"The bucket name {bucket_name} already exists")
+
+
+class S3DeleteBucketOperator(BaseOperator):
+    """
+    This operator deletes an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: Optional[str]
+    :param force_delete: Enable this to delete bucket even if not empty
+    :type force_delete: bool
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 force_delete: Optional[bool] = False,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.force_delete = force_delete
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id)
+
+    def execute(self, context):
+        S3DeleteBucketOperator.check_for_bucket(self.s3_hook, self.bucket_name)
+        self.s3_hook.delete_bucket(bucket_name=self.bucket_name, force_delete=self.force_delete)
+        self.log.info("Deleted bucket with name: %s", self.bucket_name)
+
+    @staticmethod
+    def check_for_bucket(s3_hook: S3Hook, bucket_name: str) -> None:
+        """
+        Override this method if you don't want to raise excaption if bucket doesn't exist.
+
+        :param s3_hook: Hook to interact with aws s3 services
+        :type s3_hook: S3Hook
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        :return: None
+        :rtype: None
+        """
+        if not s3_hook.check_for_bucket(bucket_name):
+            raise AirflowException(f"The bucket name {bucket_name} doesn't exist")

Review comment:
       Same here - `check_if_not_exists`. WDYT?

##########
File path: tests/providers/amazon/aws/operators/test_example_s3_bucket.py
##########
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from tests.test_utils.amazon_system_helpers import AWS_DAG_FOLDER, AmazonSystemTest, provide_aws_context
+
+
+class S3SystemTest(AmazonSystemTest):

Review comment:
       ```suggestion
   class S3BucketExampleDagsSystemTest(AmazonSystemTest):
   ```

##########
File path: airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
##########
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import io
+
+import boto3
+
+from airflow.models.dag import DAG
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator
+from airflow.utils.dates import days_ago
+
+DAG_NAME = 's3_bucket_dag'
+default_args = {
+    'owner': 'airflow',
+    'depends_on_past': True,
+    'start_date': days_ago(2)
+}
+BUCKET_NAME = 'test-airflow-12345'
+
+
+def upload_keys():
+    # add keys to bucket
+    conn = boto3.client('s3')
+    key_pattern = "path/data"
+    n_keys = 3
+    keys = [key_pattern + str(i) for i in range(n_keys)]
+    for k in keys:
+        conn.upload_fileobj(Bucket=BUCKET_NAME,
+                            Key=k,
+                            Fileobj=io.BytesIO(b"input"))

Review comment:
       ```suggestion
       s3_hook = S3Hook()
       for i in range(3):
           s3_hook.load_file_obj(string_data="input", key=f"path/data{i}", bucket_name=BUCKET_NAME)
   ```

##########
File path: airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
##########
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import io
+
+import boto3
+
+from airflow.models.dag import DAG
+from airflow.operators.python import PythonOperator
+from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator
+from airflow.utils.dates import days_ago
+
+DAG_NAME = 's3_bucket_dag'
+default_args = {
+    'owner': 'airflow',
+    'depends_on_past': True,
+    'start_date': days_ago(2)
+}
+BUCKET_NAME = 'test-airflow-12345'

Review comment:
       It would be nice if you could make this configurable via environment variables.
   ```suggestion
   BUCKET_NAME = getenv(BUCKET_NAME, 'test-airflow-12345')
   ```
   (`from os import getenv`)

##########
File path: airflow/providers/amazon/aws/operators/s3_bucket.py
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+This module contains AWS S3 operators.
+"""
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+
+
+class S3CreateBucketOperator(BaseOperator):
+    """
+    This operator creates an S3 bucket
+
+    :param bucket_name: This is bucket name you want to create
+    :type bucket_name: Optional[str]
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+        If this is None or empty then the default boto3 behaviour is used. If
+        running Airflow in a distributed manner and aws_conn_id is None or
+        empty, then default boto3 configuration would be used (and must be
+        maintained on each worker node).
+    :type aws_conn_id: Optional[str]
+    :param region_name: AWS region_name. If not specified fetched from connection.
+    :type region_name: Optional[str]
+    """
+    def __init__(self,
+                 bucket_name,
+                 aws_conn_id: Optional[str] = "aws_default",
+                 region_name: Optional[str] = None,
+                 *args,
+                 **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.bucket_name = bucket_name
+        self.region_name = region_name
+        self.s3_hook = S3Hook(aws_conn_id=aws_conn_id, region_name=region_name)
+
+    def execute(self, context):
+        S3CreateBucketOperator.check_for_bucket(self.s3_hook, self.bucket_name)
+        self.s3_hook.create_bucket(bucket_name=self.bucket_name, region_name=self.region_name)
+        self.log.info("Created bucket with name: %s", self.bucket_name)
+
+    @staticmethod
+    def check_for_bucket(s3_hook: S3Hook, bucket_name: str) -> None:
+        """
+        Override this method if you don't want to raise excaption if bucket already exists.
+
+        :param s3_hook: Hook to interact with aws s3 services
+        :type s3_hook: S3Hook
+        :param bucket_name: Bucket name
+        :type bucket_name: str
+        :return: None
+        :rtype: None
+        """
+        if s3_hook.check_for_bucket(bucket_name):
+            raise AirflowException(f"The bucket name {bucket_name} already exists")

Review comment:
       I would suggest to add an arg to the ctor `check_if_exists=True` - not have a method to override.
   ```python
           if check_if_exists and s3_hook.check_for_bucket(bucket_name):
               raise AirflowException(f"The bucket name {bucket_name} already exists")
   ```
   WDYT?

##########
File path: tests/providers/amazon/aws/hooks/test_s3.py
##########
@@ -313,6 +313,29 @@ def test_copy_object_acl(self, s3_bucket):
             assert ((response['Grants'][0]['Permission'] == 'FULL_CONTROL') and
                     (len(response['Grants']) == 1))
 
+    @mock_s3
+    def test_delete_bucket(self, s3_bucket):
+        # assert if the bucket is created
+        conn = boto3.client('s3')

Review comment:
       You don't need `boto3` here. Use `mock_hook.load_file_obj` instead of `conn.upload_fileobj` :)

##########
File path: tests/providers/amazon/aws/operators/test_example_s3_bucket.py
##########
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from tests.test_utils.amazon_system_helpers import AWS_DAG_FOLDER, AmazonSystemTest, provide_aws_context
+
+
+class S3SystemTest(AmazonSystemTest):

Review comment:
       and please rename the file to `test_s3_bucket_system.py` for consistency. Thanks :)

##########
File path: tests/providers/amazon/aws/operators/test_s3_bucket.py
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import io
+import unittest
+
+import boto3
+from moto import mock_s3
+
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator
+
+BUCKET_NAME = "test-airflow-bucket"
+TASK_ID = 'test-s3-operator'
+
+
+class TestS3CreateBucketOperator(unittest.TestCase):
+    @mock_s3
+    def test_execute(self):
+        mock_hook = S3Hook()
+        # execute s3 bucket create operator
+        op = S3CreateBucketOperator(bucket_name=BUCKET_NAME, task_id=TASK_ID)
+        op.execute({})
+        # assert if the bucket has been created
+        self.assertTrue(mock_hook.check_for_bucket(bucket_name=BUCKET_NAME))
+
+
+class TestS3DeleteBucketOperator(unittest.TestCase):
+    @mock_s3
+    def test_execute(self):
+        # assert if the bucket is created
+        conn = boto3.client('s3')

Review comment:
       Same here. We don't need to access `boto3` directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org