You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/24 00:13:08 UTC

[GitHub] stale[bot] closed pull request #3475: [AIRFLOW-2315] Add s3_upload_args to put methods on S3Hook

stale[bot] closed pull request #3475: [AIRFLOW-2315] Add s3_upload_args to put methods on S3Hook
URL: https://github.com/apache/incubator-airflow/pull/3475
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index 2d64b31534..00ee14021c 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -282,7 +282,8 @@ def load_file(self,
                   key,
                   bucket_name=None,
                   replace=False,
-                  encrypt=False):
+                  encrypt=False,
+                  upload_args=None):
         """
         Loads a local file to S3
 
@@ -299,16 +300,15 @@ def load_file(self,
         :param encrypt: If True, the file will be encrypted on the server-side
             by S3 and will be stored in an encrypted form while at rest in S3.
         :type encrypt: bool
+        :param upload_args: S3 upload args.
+            See: boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS
+        :type upload_args: dictionary
         """
-        if not bucket_name:
-            (bucket_name, key) = self.parse_s3_url(key)
-
-        if not replace and self.check_for_key(key, bucket_name):
-            raise ValueError("The key {key} already exists.".format(key=key))
-
-        extra_args = {}
-        if encrypt:
-            extra_args['ServerSideEncryption'] = "AES256"
+        (bucket_name, key, extra_args) = self._prepare_load(key,
+                                                            bucket_name,
+                                                            replace,
+                                                            encrypt,
+                                                            upload_args)
 
         client = self.get_conn()
         client.upload_file(filename, bucket_name, key, ExtraArgs=extra_args)
@@ -319,7 +319,8 @@ def load_string(self,
                     bucket_name=None,
                     replace=False,
                     encrypt=False,
-                    encoding='utf-8'):
+                    encoding='utf-8',
+                    upload_args=None):
         """
         Loads a string to S3
 
@@ -338,19 +339,24 @@ def load_string(self,
         :param encrypt: If True, the file will be encrypted on the server-side
             by S3 and will be stored in an encrypted form while at rest in S3.
         :type encrypt: bool
+        :param upload_args: S3 upload's "ExtraArgs"
+            See: boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS
+        :type upload_args: dictionary
         """
         self.load_bytes(string_data.encode(encoding),
                         key=key,
                         bucket_name=bucket_name,
                         replace=replace,
-                        encrypt=encrypt)
+                        encrypt=encrypt,
+                        upload_args=upload_args)
 
     def load_bytes(self,
                    bytes_data,
                    key,
                    bucket_name=None,
                    replace=False,
-                   encrypt=False):
+                   encrypt=False,
+                   upload_args=None):
         """
         Loads bytes to S3
 
@@ -369,18 +375,69 @@ def load_bytes(self,
         :param encrypt: If True, the file will be encrypted on the server-side
             by S3 and will be stored in an encrypted form while at rest in S3.
         :type encrypt: bool
+        :param upload_args: S3 upload's "ExtraArgs"
+            See: boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS
+        :type upload_args: dictionary
         """
+        (bucket_name, key, extra_args) = self._prepare_load(key,
+                                                            bucket_name,
+                                                            replace,
+                                                            encrypt,
+                                                            upload_args)
+
+        filelike_buffer = BytesIO(bytes_data)
+
+        client = self.get_conn()
+        client.upload_fileobj(filelike_buffer, bucket_name, key, ExtraArgs=extra_args)
+
+    def _prepare_load(self,
+                      key,
+                      bucket_name=None,
+                      replace=False,
+                      encrypt=False,
+                      upload_args=None):
+        """
+        Prepares S3 load by:
+            1. Examining arguments
+            2. Checking key existence
+            3. Determining Upload "ExtraArgs"
+
+        ExtraArg Order of Precedence:
+            1) "encrypt" argument
+            1) "upload_args" argument
+            3) Connection's json extra property: "s3_upload_args"
+
+        Returns a tuple of the prepared values: (bucket_name, key, upload_args)
+
+        :param key: S3 key that will point to the file
+        :type key: str
+        :param bucket_name: Name of the bucket in which to store the file
+        :type bucket_name: str
+        :param replace: A flag to decide whether or not to overwrite the key
+            if it already exists
+        :type replace: bool
+        :param encrypt: If True, the file will be encrypted on the server-side
+            by S3 and will be stored in an encrypted form while at rest in S3.
+        :type encrypt: bool
+        :param upload_args: S3 upload's "ExtraArgs"
+            See: boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS
+        :type upload_args: dictionary
+        """
+        if not upload_args:
+            upload_args = {}
         if not bucket_name:
             (bucket_name, key) = self.parse_s3_url(key)
 
         if not replace and self.check_for_key(key, bucket_name):
             raise ValueError("The key {key} already exists.".format(key=key))
-
-        extra_args = {}
+        applied_upload_args = {}
+        if(self.aws_conn_id):
+            connection_object = self.get_connection(self.aws_conn_id)
+            if 's3_upload_args' in connection_object.extra_dejson:
+                applied_upload_args.update(
+                    connection_object.extra_dejson.get('s3_upload_args'))
+        applied_upload_args.update(upload_args)
         if encrypt:
-            extra_args['ServerSideEncryption'] = "AES256"
+            applied_upload_args['ServerSideEncryption'] = "AES256"
 
-        filelike_buffer = BytesIO(bytes_data)
-
-        client = self.get_conn()
-        client.upload_fileobj(filelike_buffer, bucket_name, key, ExtraArgs=extra_args)
+        return (bucket_name, key, applied_upload_args)
diff --git a/tests/hooks/test_s3_hook.py b/tests/hooks/test_s3_hook.py
index 27dfba49a5..ef75f48f85 100644
--- a/tests/hooks/test_s3_hook.py
+++ b/tests/hooks/test_s3_hook.py
@@ -24,6 +24,7 @@
 from botocore.exceptions import NoCredentialsError
 
 from airflow import configuration
+from airflow.models import Connection
 
 try:
     from airflow.hooks.S3_hook import S3Hook
@@ -242,6 +243,66 @@ def test_load_string(self):
 
         self.assertEqual(body, b'Cont\xC3\xA9nt')
 
+        # [AIRFLOW-2315] Testing with argument for new param `upload_args`
+        hook.load_string(u"Contént",
+                         "my_key_2",
+                         "mybucket",
+                         upload_args={
+                             "Metadata": {
+                                 "mykey": "my_value_from_param"
+                             }
+                         })
+        metadata = boto3.resource('s3').Object('mybucket', 'my_key_2').get()['Metadata']
+
+        self.assertEqual(metadata, {"mykey": "my_value_from_param"})
+
+    @mock_s3
+    @mock.patch('airflow.contrib.hooks.aws_hook.AwsHook.get_connection')
+    def test_load_string_with_extras(self, mock_get_connection):
+        """
+        [AIRFLOW-2315] Testing with new json extras
+        """
+        mock_connection = Connection(
+            extra=(
+                '{"s3_upload_args":{'
+                '"Metadata":{'
+                '"mykey": "my_value_from_json_extras" } } }'
+            )
+        )
+        mock_get_connection.return_value = mock_connection
+
+        hook = S3Hook()
+        conn = hook.get_conn()
+        # We need to create the bucket since this is all in Moto's 'virtual'
+        # AWS account
+        conn.create_bucket(Bucket="mybucket")
+
+        hook.load_string(u"Contént", "my_key", "mybucket")
+        body = boto3.resource('s3').Object('mybucket', 'my_key').get()['Body'].read()
+
+        self.assertEqual(body, b'Cont\xC3\xA9nt')
+
+        metadata = boto3.resource('s3').Object('mybucket', 'my_key').get()['Metadata']
+
+        self.assertEqual(metadata, {"mykey": "my_value_from_json_extras"})
+
+        # [AIRFLOW-2315] Testing with argument for new param `upload_args`
+        hook.load_string(u"Contént",
+                         "my_key_2",
+                         "mybucket",
+                         upload_args={
+                             "Metadata": {
+                                 "mykey": "my_value_from_param"
+                             }
+                         })
+        body2 = boto3.resource('s3').Object('mybucket', 'my_key_2').get()['Body'].read()
+
+        self.assertEqual(body2, b'Cont\xC3\xA9nt')
+
+        metadata2 = boto3.resource('s3').Object('mybucket', 'my_key_2').get()['Metadata']
+
+        self.assertEqual(metadata2, {"mykey": "my_value_from_param"})
+
     @mock_s3
     def test_load_bytes(self):
         hook = S3Hook(aws_conn_id=None)
@@ -250,10 +311,70 @@ def test_load_bytes(self):
         # AWS account
         conn.create_bucket(Bucket="mybucket")
 
-        hook.load_bytes(b"Content", "my_key", "mybucket")
+        hook.load_bytes(b"Cont\xC3\xA9nt", "my_key", "mybucket")
         body = boto3.resource('s3').Object('mybucket', 'my_key').get()['Body'].read()
 
-        self.assertEqual(body, b'Content')
+        self.assertEqual(body, b'Cont\xC3\xA9nt')
+
+        # [AIRFLOW-2315] Testing with argument for new param `upload_args`
+        hook.load_bytes(b"Cont\xC3\xA9nt",
+                        "my_key_2",
+                        "mybucket",
+                        upload_args={
+                            "Metadata": {
+                                "mykey": "my_value_from_param"
+                            }
+                        })
+        metadata = boto3.resource('s3').Object('mybucket', 'my_key_2').get()['Metadata']
+
+        self.assertEqual(metadata, {"mykey": "my_value_from_param"})
+
+    @mock_s3
+    @mock.patch('airflow.contrib.hooks.aws_hook.AwsHook.get_connection')
+    def test_load_byes_with_extras(self, mock_get_connection):
+        """
+        [AIRFLOW-2315] Testing with new json extras
+        """
+        mock_connection = Connection(
+            extra=(
+                '{"s3_upload_args":{'
+                '"Metadata":{'
+                '"mykey": "my_value_from_json_extras" } } }'
+            )
+        )
+        mock_get_connection.return_value = mock_connection
+
+        hook = S3Hook()
+        conn = hook.get_conn()
+        # We need to create the bucket since this is all in Moto's 'virtual'
+        # AWS account
+        conn.create_bucket(Bucket="mybucket")
+
+        hook.load_bytes(b"Cont\xC3\xA9nt", "my_key", "mybucket")
+        body = boto3.resource('s3').Object('mybucket', 'my_key').get()['Body'].read()
+
+        self.assertEqual(body, b'Cont\xC3\xA9nt')
+
+        metadata = boto3.resource('s3').Object('mybucket', 'my_key').get()['Metadata']
+
+        self.assertEqual(metadata, {"mykey": "my_value_from_json_extras"})
+
+        # [AIRFLOW-2315] Testing with argument for new param `upload_args`
+        hook.load_bytes(b"Cont\xC3\xA9nt",
+                        "my_key_2",
+                        "mybucket",
+                        upload_args={
+                            "Metadata": {
+                                "mykey": "my_value_from_param"
+                            }
+                        })
+        body2 = boto3.resource('s3').Object('mybucket', 'my_key_2').get()['Body'].read()
+
+        self.assertEqual(body2, b'Cont\xC3\xA9nt')
+
+        metadata2 = boto3.resource('s3').Object('mybucket', 'my_key_2').get()['Metadata']
+
+        self.assertEqual(metadata2, {"mykey": "my_value_from_param"})
 
 
 if __name__ == '__main__':


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services