You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/04/20 06:42:49 UTC

[airflow] branch main updated: Add `S3CreateObjectOperator` (#22758)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e1ac6e425 Add `S3CreateObjectOperator` (#22758)
9e1ac6e425 is described below

commit 9e1ac6e425aa52a55601bb2b5587fd97d361bfcc
Author: Vincent <97...@users.noreply.github.com>
AuthorDate: Wed Apr 20 00:42:37 2022 -0600

    Add `S3CreateObjectOperator` (#22758)
    
    * Add S3CreateObjectOperator
    
    Co-authored-by: eladkal <45...@users.noreply.github.com>
---
 .../amazon/aws/example_dags/example_s3.py          |  21 +++++
 airflow/providers/amazon/aws/operators/s3.py       |  88 ++++++++++++++++++
 .../operators/s3.rst                               |  15 +++
 .../amazon/aws/operators/test_s3_copy_object.py    |  83 -----------------
 ...test_s3_delete_objects.py => test_s3_object.py} | 101 ++++++++++++++++++++-
 5 files changed, 222 insertions(+), 86 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_s3.py b/airflow/providers/amazon/aws/example_dags/example_s3.py
index 72aa6d238d..81508f365c 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3.py
@@ -24,6 +24,7 @@ from airflow.models.dag import DAG
 from airflow.providers.amazon.aws.operators.s3 import (
     S3CopyObjectOperator,
     S3CreateBucketOperator,
+    S3CreateObjectOperator,
     S3DeleteBucketOperator,
     S3DeleteBucketTaggingOperator,
     S3DeleteObjectsOperator,
@@ -38,6 +39,15 @@ KEY = os.environ.get('KEY', 'key')
 KEY_2 = os.environ.get('KEY_2', 'key2')
 TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key')
 TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value')
+KEY = os.environ.get('KEY', 'key')
+DATA = os.environ.get(
+    'DATA',
+    '''
+apple,0.5
+milk,2.5
+bread,4.0
+''',
+)
 
 with DAG(
     dag_id='example_s3',
@@ -92,6 +102,16 @@ with DAG(
     )
     # [END howto_operator_s3_delete_bucket_tagging]
 
+    # [START howto_operator_s3_create_object]
+    s3_create_object = S3CreateObjectOperator(
+        task_id="s3_create_object",
+        s3_bucket=BUCKET_NAME,
+        s3_key=KEY,
+        data=DATA,
+        replace=True,
+    )
+    # [END howto_operator_s3_create_object]
+
     # [START howto_sensor_s3_key_single_key]
     # Check if a file exists
     s3_sensor_one_key = S3KeySensor(
@@ -149,6 +169,7 @@ with DAG(
         put_tagging,
         get_tagging,
         delete_tagging,
+        s3_create_object,
         [s3_sensor_one_key, s3_sensor_two_keys, s3_sensor_key_function],
         s3_copy_object,
         s3_delete_objects,
diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py
index fb543048f6..2ed7983e1a 100644
--- a/airflow/providers/amazon/aws/operators/s3.py
+++ b/airflow/providers/amazon/aws/operators/s3.py
@@ -322,6 +322,94 @@ class S3CopyObjectOperator(BaseOperator):
         )
 
 
+class S3CreateObjectOperator(BaseOperator):
+    """
+    Creates a new object from `data` as string or bytes.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:S3CreateObjectOperator`
+
+    :param s3_bucket: Name of the S3 bucket where to save the object. (templated)
+        It should be omitted when `bucket_key` is provided as a full s3:// url.
+    :param s3_key: The key of the object to be created. (templated)
+        It can be either full s3:// style url or relative path from root level.
+        When it's specified as a full s3:// url, please omit bucket_name.
+    :param data: string or bytes to save as content.
+    :param replace: If True, it will overwrite the key if it already exists
+    :param encrypt: If True, the file will be encrypted on the server-side
+        by S3 and will be stored in an encrypted form while at rest in S3.
+    :param acl_policy: String specifying the canned ACL policy for the file being
+        uploaded to the S3 bucket.
+    :param encoding: The string to byte encoding.
+        It should be specified only when `data` is provided as string.
+    :param compression: Type of compression to use, currently only gzip is supported.
+        It can be specified only when `data` is provided as string.
+    :param aws_conn_id: Connection id of the S3 connection to use
+    :param verify: Whether or not to verify SSL certificates for S3 connection.
+        By default SSL certificates are verified.
+
+        You can provide the following values:
+
+        - False: do not validate SSL certificates. SSL will still be used,
+                 but SSL certificates will not be
+                 verified.
+        - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+                 You can specify this argument if you want to use a different
+                 CA cert bundle than the one used by botocore.
+
+    """
+
+    template_fields: Sequence[str] = ('s3_bucket', 's3_key', 'data')
+
+    def __init__(
+        self,
+        *,
+        s3_bucket: Optional[str] = None,
+        s3_key: str,
+        data: Union[str, bytes],
+        replace: bool = False,
+        encrypt: bool = False,
+        acl_policy: Optional[str] = None,
+        encoding: Optional[str] = None,
+        compression: Optional[str] = None,
+        aws_conn_id: str = 'aws_default',
+        verify: Optional[Union[str, bool]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.data = data
+        self.replace = replace
+        self.encrypt = encrypt
+        self.acl_policy = acl_policy
+        self.encoding = encoding
+        self.compression = compression
+        self.aws_conn_id = aws_conn_id
+        self.verify = verify
+
+    def execute(self, context: 'Context'):
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+
+        if isinstance(self.data, str):
+            s3_hook.load_string(
+                self.data,
+                self.s3_key,
+                self.s3_bucket,
+                self.replace,
+                self.encrypt,
+                self.encoding,
+                self.acl_policy,
+                self.compression,
+            )
+        else:
+            s3_hook.load_bytes(
+                self.data, self.s3_key, self.s3_bucket, self.replace, self.encrypt, self.acl_policy
+            )
+
+
 class S3DeleteObjectsOperator(BaseOperator):
     """
     To enable users to delete single object or multiple objects from
diff --git a/docs/apache-airflow-providers-amazon/operators/s3.rst b/docs/apache-airflow-providers-amazon/operators/s3.rst
index 6b7f385d10..aeb04b9dc4 100644
--- a/docs/apache-airflow-providers-amazon/operators/s3.rst
+++ b/docs/apache-airflow-providers-amazon/operators/s3.rst
@@ -28,6 +28,7 @@ Airflow to Amazon Simple Storage Service (S3) integration provides several opera
  - :class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator`
  - :class:`~airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator`
  - :class:`~airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator`
+ - :class:`~airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator`
  - :class:`~airflow.providers.amazon.aws.operators.s3.S3CopyObjectOperator`
  - :class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator`
  - :class:`~airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator`
@@ -164,6 +165,20 @@ multiple files can match one key. The list of matched S3 object attributes conta
     :start-after: [START howto_sensor_s3_key_function]
     :end-before: [END howto_sensor_s3_key_function]
 
+.. _howto/operator:S3CreateObjectOperator:
+
+Create an Amazon S3 object
+--------------------------
+
+To create a new (or replace) Amazon S3 object you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_s3_create_object]
+    :end-before: [END howto_operator_s3_create_object]
+
 .. _howto/operator:S3CopyObjectOperator:
 
 Copy an Amazon S3 object
diff --git a/tests/providers/amazon/aws/operators/test_s3_copy_object.py b/tests/providers/amazon/aws/operators/test_s3_copy_object.py
deleted file mode 100644
index a2f4f9eddb..0000000000
--- a/tests/providers/amazon/aws/operators/test_s3_copy_object.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import io
-import unittest
-
-import boto3
-from moto import mock_s3
-
-from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
-
-
-class TestS3CopyObjectOperator(unittest.TestCase):
-    def setUp(self):
-        self.source_bucket = "bucket1"
-        self.source_key = "path1/data.txt"
-        self.dest_bucket = "bucket2"
-        self.dest_key = "path2/data_copy.txt"
-
-    @mock_s3
-    def test_s3_copy_object_arg_combination_1(self):
-        conn = boto3.client('s3')
-        conn.create_bucket(Bucket=self.source_bucket)
-        conn.create_bucket(Bucket=self.dest_bucket)
-        conn.upload_fileobj(Bucket=self.source_bucket, Key=self.source_key, Fileobj=io.BytesIO(b"input"))
-
-        # there should be nothing found before S3CopyObjectOperator is executed
-        assert 'Contents' not in conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
-
-        op = S3CopyObjectOperator(
-            task_id="test_task_s3_copy_object",
-            source_bucket_key=self.source_key,
-            source_bucket_name=self.source_bucket,
-            dest_bucket_key=self.dest_key,
-            dest_bucket_name=self.dest_bucket,
-        )
-        op.execute(None)
-
-        objects_in_dest_bucket = conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
-        # there should be object found, and there should only be one object found
-        assert len(objects_in_dest_bucket['Contents']) == 1
-        # the object found should be consistent with dest_key specified earlier
-        assert objects_in_dest_bucket['Contents'][0]['Key'] == self.dest_key
-
-    @mock_s3
-    def test_s3_copy_object_arg_combination_2(self):
-        conn = boto3.client('s3')
-        conn.create_bucket(Bucket=self.source_bucket)
-        conn.create_bucket(Bucket=self.dest_bucket)
-        conn.upload_fileobj(Bucket=self.source_bucket, Key=self.source_key, Fileobj=io.BytesIO(b"input"))
-
-        # there should be nothing found before S3CopyObjectOperator is executed
-        assert 'Contents' not in conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
-
-        source_key_s3_url = f"s3://{self.source_bucket}/{self.source_key}"
-        dest_key_s3_url = f"s3://{self.dest_bucket}/{self.dest_key}"
-        op = S3CopyObjectOperator(
-            task_id="test_task_s3_copy_object",
-            source_bucket_key=source_key_s3_url,
-            dest_bucket_key=dest_key_s3_url,
-        )
-        op.execute(None)
-
-        objects_in_dest_bucket = conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
-        # there should be object found, and there should only be one object found
-        assert len(objects_in_dest_bucket['Contents']) == 1
-        # the object found should be consistent with dest_key specified earlier
-        assert objects_in_dest_bucket['Contents'][0]['Key'] == self.dest_key
diff --git a/tests/providers/amazon/aws/operators/test_s3_delete_objects.py b/tests/providers/amazon/aws/operators/test_s3_object.py
similarity index 67%
rename from tests/providers/amazon/aws/operators/test_s3_delete_objects.py
rename to tests/providers/amazon/aws/operators/test_s3_object.py
index b878f1dc7e..3bf993fab5 100644
--- a/tests/providers/amazon/aws/operators/test_s3_delete_objects.py
+++ b/tests/providers/amazon/aws/operators/test_s3_object.py
@@ -15,15 +15,82 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import io
 import unittest
+from unittest import mock
 
 import boto3
 from moto import mock_s3
 
-from airflow.exceptions import AirflowException
-from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator
+from airflow import AirflowException
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.s3 import (
+    S3CopyObjectOperator,
+    S3CreateObjectOperator,
+    S3DeleteObjectsOperator,
+)
+
+S3_BUCKET = "test-airflow-bucket"
+S3_KEY = "test-airflow-key"
+TASK_ID = "test-s3-operator"
+
+
+class TestS3CopyObjectOperator(unittest.TestCase):
+    def setUp(self):
+        self.source_bucket = "bucket1"
+        self.source_key = "path1/data.txt"
+        self.dest_bucket = "bucket2"
+        self.dest_key = "path2/data_copy.txt"
+
+    @mock_s3
+    def test_s3_copy_object_arg_combination_1(self):
+        conn = boto3.client('s3')
+        conn.create_bucket(Bucket=self.source_bucket)
+        conn.create_bucket(Bucket=self.dest_bucket)
+        conn.upload_fileobj(Bucket=self.source_bucket, Key=self.source_key, Fileobj=io.BytesIO(b"input"))
+
+        # there should be nothing found before S3CopyObjectOperator is executed
+        assert 'Contents' not in conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
+
+        op = S3CopyObjectOperator(
+            task_id="test_task_s3_copy_object",
+            source_bucket_key=self.source_key,
+            source_bucket_name=self.source_bucket,
+            dest_bucket_key=self.dest_key,
+            dest_bucket_name=self.dest_bucket,
+        )
+        op.execute(None)
+
+        objects_in_dest_bucket = conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
+        # there should be object found, and there should only be one object found
+        assert len(objects_in_dest_bucket['Contents']) == 1
+        # the object found should be consistent with dest_key specified earlier
+        assert objects_in_dest_bucket['Contents'][0]['Key'] == self.dest_key
+
+    @mock_s3
+    def test_s3_copy_object_arg_combination_2(self):
+        conn = boto3.client('s3')
+        conn.create_bucket(Bucket=self.source_bucket)
+        conn.create_bucket(Bucket=self.dest_bucket)
+        conn.upload_fileobj(Bucket=self.source_bucket, Key=self.source_key, Fileobj=io.BytesIO(b"input"))
+
+        # there should be nothing found before S3CopyObjectOperator is executed
+        assert 'Contents' not in conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
+
+        source_key_s3_url = f"s3://{self.source_bucket}/{self.source_key}"
+        dest_key_s3_url = f"s3://{self.dest_bucket}/{self.dest_key}"
+        op = S3CopyObjectOperator(
+            task_id="test_task_s3_copy_object",
+            source_bucket_key=source_key_s3_url,
+            dest_bucket_key=dest_key_s3_url,
+        )
+        op.execute(None)
+
+        objects_in_dest_bucket = conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
+        # there should be object found, and there should only be one object found
+        assert len(objects_in_dest_bucket['Contents']) == 1
+        # the object found should be consistent with dest_key specified earlier
+        assert objects_in_dest_bucket['Contents'][0]['Key'] == self.dest_key
 
 
 class TestS3DeleteObjectsOperator(unittest.TestCase):
@@ -187,3 +254,31 @@ class TestS3DeleteObjectsOperator(unittest.TestCase):
         assert len(objects_in_dest_bucket['Contents']) == 1
         # the object found should be consistent with dest_key specified earlier
         assert objects_in_dest_bucket['Contents'][0]['Key'] == key_of_test
+
+
+class TestS3CreateObjectOperator(unittest.TestCase):
+    @mock.patch.object(S3Hook, "load_string")
+    def test_execute_if_data_is_string(self, mock_load_string):
+        data = "data"
+        operator = S3CreateObjectOperator(
+            task_id=TASK_ID,
+            s3_bucket=S3_BUCKET,
+            s3_key=S3_KEY,
+            data=data,
+        )
+        operator.execute(None)
+
+        mock_load_string.assert_called_once_with(data, S3_KEY, S3_BUCKET, False, False, None, None, None)
+
+    @mock.patch.object(S3Hook, "load_bytes")
+    def test_execute_if_data_is_bytes(self, mock_load_bytes):
+        data = b"data"
+        operator = S3CreateObjectOperator(
+            task_id=TASK_ID,
+            s3_bucket=S3_BUCKET,
+            s3_key=S3_KEY,
+            data=data,
+        )
+        operator.execute(None)
+
+        mock_load_bytes.assert_called_once_with(data, S3_KEY, S3_BUCKET, False, False, None)