You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/04/20 06:42:49 UTC
[airflow] branch main updated: Add `S3CreateObjectOperator` (#22758)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9e1ac6e425 Add `S3CreateObjectOperator` (#22758)
9e1ac6e425 is described below
commit 9e1ac6e425aa52a55601bb2b5587fd97d361bfcc
Author: Vincent <97...@users.noreply.github.com>
AuthorDate: Wed Apr 20 00:42:37 2022 -0600
Add `S3CreateObjectOperator` (#22758)
* Add S3CreateObjectOperator
Co-authored-by: eladkal <45...@users.noreply.github.com>
---
.../amazon/aws/example_dags/example_s3.py | 21 +++++
airflow/providers/amazon/aws/operators/s3.py | 88 ++++++++++++++++++
.../operators/s3.rst | 15 +++
.../amazon/aws/operators/test_s3_copy_object.py | 83 -----------------
...test_s3_delete_objects.py => test_s3_object.py} | 101 ++++++++++++++++++++-
5 files changed, 222 insertions(+), 86 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_s3.py b/airflow/providers/amazon/aws/example_dags/example_s3.py
index 72aa6d238d..81508f365c 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3.py
@@ -24,6 +24,7 @@ from airflow.models.dag import DAG
from airflow.providers.amazon.aws.operators.s3 import (
S3CopyObjectOperator,
S3CreateBucketOperator,
+ S3CreateObjectOperator,
S3DeleteBucketOperator,
S3DeleteBucketTaggingOperator,
S3DeleteObjectsOperator,
@@ -38,6 +39,15 @@ KEY = os.environ.get('KEY', 'key')
KEY_2 = os.environ.get('KEY_2', 'key2')
TAG_KEY = os.environ.get('TAG_KEY', 'test-s3-bucket-tagging-key')
TAG_VALUE = os.environ.get('TAG_VALUE', 'test-s3-bucket-tagging-value')
+KEY = os.environ.get('KEY', 'key')
+DATA = os.environ.get(
+ 'DATA',
+ '''
+apple,0.5
+milk,2.5
+bread,4.0
+''',
+)
with DAG(
dag_id='example_s3',
@@ -92,6 +102,16 @@ with DAG(
)
# [END howto_operator_s3_delete_bucket_tagging]
+ # [START howto_operator_s3_create_object]
+ s3_create_object = S3CreateObjectOperator(
+ task_id="s3_create_object",
+ s3_bucket=BUCKET_NAME,
+ s3_key=KEY,
+ data=DATA,
+ replace=True,
+ )
+ # [END howto_operator_s3_create_object]
+
# [START howto_sensor_s3_key_single_key]
# Check if a file exists
s3_sensor_one_key = S3KeySensor(
@@ -149,6 +169,7 @@ with DAG(
put_tagging,
get_tagging,
delete_tagging,
+ s3_create_object,
[s3_sensor_one_key, s3_sensor_two_keys, s3_sensor_key_function],
s3_copy_object,
s3_delete_objects,
diff --git a/airflow/providers/amazon/aws/operators/s3.py b/airflow/providers/amazon/aws/operators/s3.py
index fb543048f6..2ed7983e1a 100644
--- a/airflow/providers/amazon/aws/operators/s3.py
+++ b/airflow/providers/amazon/aws/operators/s3.py
@@ -322,6 +322,94 @@ class S3CopyObjectOperator(BaseOperator):
)
+class S3CreateObjectOperator(BaseOperator):
+ """
+ Creates a new object from `data` as string or bytes.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:S3CreateObjectOperator`
+
+ :param s3_bucket: Name of the S3 bucket where to save the object. (templated)
+ It should be omitted when `bucket_key` is provided as a full s3:// url.
+ :param s3_key: The key of the object to be created. (templated)
+ It can be either full s3:// style url or relative path from root level.
+ When it's specified as a full s3:// url, please omit bucket_name.
+ :param data: string or bytes to save as content.
+ :param replace: If True, it will overwrite the key if it already exists
+ :param encrypt: If True, the file will be encrypted on the server-side
+ by S3 and will be stored in an encrypted form while at rest in S3.
+ :param acl_policy: String specifying the canned ACL policy for the file being
+ uploaded to the S3 bucket.
+ :param encoding: The string to byte encoding.
+ It should be specified only when `data` is provided as string.
+ :param compression: Type of compression to use, currently only gzip is supported.
+ It can be specified only when `data` is provided as string.
+ :param aws_conn_id: Connection id of the S3 connection to use
+ :param verify: Whether or not to verify SSL certificates for S3 connection.
+ By default SSL certificates are verified.
+
+ You can provide the following values:
+
+ - False: do not validate SSL certificates. SSL will still be used,
+ but SSL certificates will not be
+ verified.
+ - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+ You can specify this argument if you want to use a different
+ CA cert bundle than the one used by botocore.
+
+ """
+
+ template_fields: Sequence[str] = ('s3_bucket', 's3_key', 'data')
+
+ def __init__(
+ self,
+ *,
+ s3_bucket: Optional[str] = None,
+ s3_key: str,
+ data: Union[str, bytes],
+ replace: bool = False,
+ encrypt: bool = False,
+ acl_policy: Optional[str] = None,
+ encoding: Optional[str] = None,
+ compression: Optional[str] = None,
+ aws_conn_id: str = 'aws_default',
+ verify: Optional[Union[str, bool]] = None,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+
+ self.s3_bucket = s3_bucket
+ self.s3_key = s3_key
+ self.data = data
+ self.replace = replace
+ self.encrypt = encrypt
+ self.acl_policy = acl_policy
+ self.encoding = encoding
+ self.compression = compression
+ self.aws_conn_id = aws_conn_id
+ self.verify = verify
+
+ def execute(self, context: 'Context'):
+ s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+
+ if isinstance(self.data, str):
+ s3_hook.load_string(
+ self.data,
+ self.s3_key,
+ self.s3_bucket,
+ self.replace,
+ self.encrypt,
+ self.encoding,
+ self.acl_policy,
+ self.compression,
+ )
+ else:
+ s3_hook.load_bytes(
+ self.data, self.s3_key, self.s3_bucket, self.replace, self.encrypt, self.acl_policy
+ )
+
+
class S3DeleteObjectsOperator(BaseOperator):
"""
To enable users to delete single object or multiple objects from
diff --git a/docs/apache-airflow-providers-amazon/operators/s3.rst b/docs/apache-airflow-providers-amazon/operators/s3.rst
index 6b7f385d10..aeb04b9dc4 100644
--- a/docs/apache-airflow-providers-amazon/operators/s3.rst
+++ b/docs/apache-airflow-providers-amazon/operators/s3.rst
@@ -28,6 +28,7 @@ Airflow to Amazon Simple Storage Service (S3) integration provides several opera
- :class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator`
- :class:`~airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator`
- :class:`~airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator`
+ - :class:`~airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator`
- :class:`~airflow.providers.amazon.aws.operators.s3.S3CopyObjectOperator`
- :class:`~airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator`
- :class:`~airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator`
@@ -164,6 +165,20 @@ multiple files can match one key. The list of matched S3 object attributes conta
:start-after: [START howto_sensor_s3_key_function]
:end-before: [END howto_sensor_s3_key_function]
+.. _howto/operator:S3CreateObjectOperator:
+
+Create an Amazon S3 object
+--------------------------
+
+To create a new (or replace) Amazon S3 object you can use
+:class:`~airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_s3_create_object]
+ :end-before: [END howto_operator_s3_create_object]
+
.. _howto/operator:S3CopyObjectOperator:
Copy an Amazon S3 object
diff --git a/tests/providers/amazon/aws/operators/test_s3_copy_object.py b/tests/providers/amazon/aws/operators/test_s3_copy_object.py
deleted file mode 100644
index a2f4f9eddb..0000000000
--- a/tests/providers/amazon/aws/operators/test_s3_copy_object.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import io
-import unittest
-
-import boto3
-from moto import mock_s3
-
-from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
-
-
-class TestS3CopyObjectOperator(unittest.TestCase):
- def setUp(self):
- self.source_bucket = "bucket1"
- self.source_key = "path1/data.txt"
- self.dest_bucket = "bucket2"
- self.dest_key = "path2/data_copy.txt"
-
- @mock_s3
- def test_s3_copy_object_arg_combination_1(self):
- conn = boto3.client('s3')
- conn.create_bucket(Bucket=self.source_bucket)
- conn.create_bucket(Bucket=self.dest_bucket)
- conn.upload_fileobj(Bucket=self.source_bucket, Key=self.source_key, Fileobj=io.BytesIO(b"input"))
-
- # there should be nothing found before S3CopyObjectOperator is executed
- assert 'Contents' not in conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
-
- op = S3CopyObjectOperator(
- task_id="test_task_s3_copy_object",
- source_bucket_key=self.source_key,
- source_bucket_name=self.source_bucket,
- dest_bucket_key=self.dest_key,
- dest_bucket_name=self.dest_bucket,
- )
- op.execute(None)
-
- objects_in_dest_bucket = conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
- # there should be object found, and there should only be one object found
- assert len(objects_in_dest_bucket['Contents']) == 1
- # the object found should be consistent with dest_key specified earlier
- assert objects_in_dest_bucket['Contents'][0]['Key'] == self.dest_key
-
- @mock_s3
- def test_s3_copy_object_arg_combination_2(self):
- conn = boto3.client('s3')
- conn.create_bucket(Bucket=self.source_bucket)
- conn.create_bucket(Bucket=self.dest_bucket)
- conn.upload_fileobj(Bucket=self.source_bucket, Key=self.source_key, Fileobj=io.BytesIO(b"input"))
-
- # there should be nothing found before S3CopyObjectOperator is executed
- assert 'Contents' not in conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
-
- source_key_s3_url = f"s3://{self.source_bucket}/{self.source_key}"
- dest_key_s3_url = f"s3://{self.dest_bucket}/{self.dest_key}"
- op = S3CopyObjectOperator(
- task_id="test_task_s3_copy_object",
- source_bucket_key=source_key_s3_url,
- dest_bucket_key=dest_key_s3_url,
- )
- op.execute(None)
-
- objects_in_dest_bucket = conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
- # there should be object found, and there should only be one object found
- assert len(objects_in_dest_bucket['Contents']) == 1
- # the object found should be consistent with dest_key specified earlier
- assert objects_in_dest_bucket['Contents'][0]['Key'] == self.dest_key
diff --git a/tests/providers/amazon/aws/operators/test_s3_delete_objects.py b/tests/providers/amazon/aws/operators/test_s3_object.py
similarity index 67%
rename from tests/providers/amazon/aws/operators/test_s3_delete_objects.py
rename to tests/providers/amazon/aws/operators/test_s3_object.py
index b878f1dc7e..3bf993fab5 100644
--- a/tests/providers/amazon/aws/operators/test_s3_delete_objects.py
+++ b/tests/providers/amazon/aws/operators/test_s3_object.py
@@ -15,15 +15,82 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
import io
import unittest
+from unittest import mock
import boto3
from moto import mock_s3
-from airflow.exceptions import AirflowException
-from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator
+from airflow import AirflowException
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.s3 import (
+ S3CopyObjectOperator,
+ S3CreateObjectOperator,
+ S3DeleteObjectsOperator,
+)
+
+S3_BUCKET = "test-airflow-bucket"
+S3_KEY = "test-airflow-key"
+TASK_ID = "test-s3-operator"
+
+
+class TestS3CopyObjectOperator(unittest.TestCase):
+ def setUp(self):
+ self.source_bucket = "bucket1"
+ self.source_key = "path1/data.txt"
+ self.dest_bucket = "bucket2"
+ self.dest_key = "path2/data_copy.txt"
+
+ @mock_s3
+ def test_s3_copy_object_arg_combination_1(self):
+ conn = boto3.client('s3')
+ conn.create_bucket(Bucket=self.source_bucket)
+ conn.create_bucket(Bucket=self.dest_bucket)
+ conn.upload_fileobj(Bucket=self.source_bucket, Key=self.source_key, Fileobj=io.BytesIO(b"input"))
+
+ # there should be nothing found before S3CopyObjectOperator is executed
+ assert 'Contents' not in conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
+
+ op = S3CopyObjectOperator(
+ task_id="test_task_s3_copy_object",
+ source_bucket_key=self.source_key,
+ source_bucket_name=self.source_bucket,
+ dest_bucket_key=self.dest_key,
+ dest_bucket_name=self.dest_bucket,
+ )
+ op.execute(None)
+
+ objects_in_dest_bucket = conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
+ # there should be object found, and there should only be one object found
+ assert len(objects_in_dest_bucket['Contents']) == 1
+ # the object found should be consistent with dest_key specified earlier
+ assert objects_in_dest_bucket['Contents'][0]['Key'] == self.dest_key
+
+ @mock_s3
+ def test_s3_copy_object_arg_combination_2(self):
+ conn = boto3.client('s3')
+ conn.create_bucket(Bucket=self.source_bucket)
+ conn.create_bucket(Bucket=self.dest_bucket)
+ conn.upload_fileobj(Bucket=self.source_bucket, Key=self.source_key, Fileobj=io.BytesIO(b"input"))
+
+ # there should be nothing found before S3CopyObjectOperator is executed
+ assert 'Contents' not in conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
+
+ source_key_s3_url = f"s3://{self.source_bucket}/{self.source_key}"
+ dest_key_s3_url = f"s3://{self.dest_bucket}/{self.dest_key}"
+ op = S3CopyObjectOperator(
+ task_id="test_task_s3_copy_object",
+ source_bucket_key=source_key_s3_url,
+ dest_bucket_key=dest_key_s3_url,
+ )
+ op.execute(None)
+
+ objects_in_dest_bucket = conn.list_objects(Bucket=self.dest_bucket, Prefix=self.dest_key)
+ # there should be object found, and there should only be one object found
+ assert len(objects_in_dest_bucket['Contents']) == 1
+ # the object found should be consistent with dest_key specified earlier
+ assert objects_in_dest_bucket['Contents'][0]['Key'] == self.dest_key
class TestS3DeleteObjectsOperator(unittest.TestCase):
@@ -187,3 +254,31 @@ class TestS3DeleteObjectsOperator(unittest.TestCase):
assert len(objects_in_dest_bucket['Contents']) == 1
# the object found should be consistent with dest_key specified earlier
assert objects_in_dest_bucket['Contents'][0]['Key'] == key_of_test
+
+
+class TestS3CreateObjectOperator(unittest.TestCase):
+ @mock.patch.object(S3Hook, "load_string")
+ def test_execute_if_data_is_string(self, mock_load_string):
+ data = "data"
+ operator = S3CreateObjectOperator(
+ task_id=TASK_ID,
+ s3_bucket=S3_BUCKET,
+ s3_key=S3_KEY,
+ data=data,
+ )
+ operator.execute(None)
+
+ mock_load_string.assert_called_once_with(data, S3_KEY, S3_BUCKET, False, False, None, None, None)
+
+ @mock.patch.object(S3Hook, "load_bytes")
+ def test_execute_if_data_is_bytes(self, mock_load_bytes):
+ data = b"data"
+ operator = S3CreateObjectOperator(
+ task_id=TASK_ID,
+ s3_bucket=S3_BUCKET,
+ s3_key=S3_KEY,
+ data=data,
+ )
+ operator.execute(None)
+
+ mock_load_bytes.assert_called_once_with(data, S3_KEY, S3_BUCKET, False, False, None)