You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/11/22 14:03:26 UTC

incubator-airflow git commit: [AIRFLOW-1839] Fix more bugs in S3Hook boto -> boto3 migration

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6b1ceff7d -> 2d5408935


[AIRFLOW-1839] Fix more bugs in S3Hook boto -> boto3 migration

There were some more bugs as a result of the boto
to boto3 migration
that weren't covered by existing tests. Now they
are fixed, and covered.
Hopefully I got everything this time.

Closes #2805 from ashb/AIRFLOW-1839-s3
-hook_loadsa-tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d540893
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d540893
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d540893

Branch: refs/heads/master
Commit: 2d5408935fc41c3d3b6618d8c563d1eecac06561
Parents: 6b1ceff
Author: Ash Berlin-Taylor <as...@firemirror.com>
Authored: Wed Nov 22 15:03:15 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Nov 22 15:03:15 2017 +0100

----------------------------------------------------------------------
 airflow/hooks/S3_hook.py    |  22 +++----
 tests/hooks/test_s3_hook.py | 131 ++++++++++++++++++++++++++++++++++++---
 2 files changed, 134 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d540893/airflow/hooks/S3_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index 226b520..717ba33 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -58,7 +58,7 @@ class S3Hook(AwsHook):
         :param bucket_name: the name of the bucket
         :type bucket_name: str
         """
-        s3 = self.get_resource('s3')
+        s3 = self.get_resource_type('s3')
         return s3.Bucket(bucket_name)
 
     def check_for_prefix(self, bucket_name, prefix, delimiter):
@@ -69,7 +69,7 @@ class S3Hook(AwsHook):
         prefix_split = re.split(r'(\w+[{d}])$'.format(d=delimiter), prefix, 1)
         previous_level = prefix_split[0]
         plist = self.list_prefixes(bucket_name, previous_level, delimiter)
-        return False if plist is None else prefix in plist        
+        return False if plist is None else prefix in plist
 
     def list_prefixes(self, bucket_name, prefix='', delimiter=''):
         """
@@ -85,7 +85,7 @@ class S3Hook(AwsHook):
         response = self.get_conn().list_objects_v2(Bucket=bucket_name, 
                                                    Prefix=prefix, 
                                                    Delimiter=delimiter)
-        return [p.Prefix for p in response['CommonPrefixes']] if response.get('CommonPrefixes') else None
+        return [p['Prefix'] for p in response['CommonPrefixes']] if response.get('CommonPrefixes') else None
 
     def list_keys(self, bucket_name, prefix='', delimiter=''):
         """
@@ -98,10 +98,10 @@ class S3Hook(AwsHook):
         :param delimiter: the delimiter marks key hierarchy.
         :type delimiter: str
         """
-        response = self.get_conn().list_objects_v2(Bucket=bucket_name, 
-                                                   Prefix=prefix, 
+        response = self.get_conn().list_objects_v2(Bucket=bucket_name,
+                                                   Prefix=prefix,
                                                    Delimiter=delimiter)
-        return [k.Key for k in response['Contents']] if response.get('Contents') else None
+        return [k['Key'] for k in response['Contents']] if response.get('Contents') else None
 
     def check_for_key(self, key, bucket_name=None):
         """
@@ -114,7 +114,7 @@ class S3Hook(AwsHook):
         """
         if not bucket_name:
             (bucket_name, key) = self.parse_s3_url(key)
-        
+
         try:
             self.get_conn().head_object(Bucket=bucket_name, Key=key)
             return True
@@ -170,7 +170,7 @@ class S3Hook(AwsHook):
         """
         if not bucket_name:
             (bucket_name, wildcard_key) = self.parse_s3_url(wildcard_key)
-        
+
         prefix = re.split(r'[*]', wildcard_key, 1)[0]
         klist = self.list_keys(bucket_name, prefix=prefix, delimiter=delimiter)
         if klist:
@@ -203,14 +203,14 @@ class S3Hook(AwsHook):
         """
         if not bucket_name:
             (bucket_name, key) = self.parse_s3_url(key)
-        
+
         if not replace and self.check_for_key(key, bucket_name):
             raise ValueError("The key {key} already exists.".format(key=key))
-        
+
         extra_args={}
         if encrypt:
             extra_args['ServerSideEncryption'] = "AES256"
-        
+
         client = self.get_conn()
         client.upload_file(filename, bucket_name, key, ExtraArgs=extra_args)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d540893/tests/hooks/test_s3_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_s3_hook.py b/tests/hooks/test_s3_hook.py
index 48c9fde..d392ea0 100644
--- a/tests/hooks/test_s3_hook.py
+++ b/tests/hooks/test_s3_hook.py
@@ -35,6 +35,7 @@ except ImportError:
 @unittest.skipIf(mock_s3 is None,
                  "Skipping test because moto.mock_s3 is not available")
 class TestS3Hook(unittest.TestCase):
+
     def setUp(self):
         configuration.load_test_config()
         self.s3_test_url = "s3://test/this/is/not/a-real-key.txt"
@@ -44,19 +45,79 @@ class TestS3Hook(unittest.TestCase):
         self.assertEqual(parsed,
                          ("test", "this/is/not/a-real-key.txt"),
                          "Incorrect parsing of the s3 url")
+    @mock_s3
+    def test_check_for_bucket(self):
+        hook = S3Hook(aws_conn_id=None)
+        b = hook.get_bucket('bucket')
+        b.create()
+
+        self.assertTrue(hook.check_for_bucket('bucket'))
+        self.assertFalse(hook.check_for_bucket('not-a-bucket'))
 
     @mock_s3
-    def test_load_string(self):
+    def test_get_bucket(self):
         hook = S3Hook(aws_conn_id=None)
-        conn = hook.get_conn()
-        # We need to create the bucket since this is all in Moto's 'virtual'
-        # AWS account
-        conn.create_bucket(Bucket="mybucket")
+        b = hook.get_bucket('bucket')
+        self.assertIsNotNone(b)
 
-        hook.load_string(u"Contént", "my_key", "mybucket")
-        body = boto3.resource('s3').Object('mybucket', 'my_key').get()['Body'].read()
+    @mock_s3
+    def test_check_for_prefix(self):
+        hook = S3Hook(aws_conn_id=None)
+        b = hook.get_bucket('bucket')
+        b.create()
+        b.put_object(Key='a', Body=b'a')
+        b.put_object(Key='dir/b', Body=b'b')
 
-        self.assertEqual(body, b'Cont\xC3\xA9nt')
+        self.assertTrue(hook.check_for_prefix('bucket', prefix='dir/', delimiter='/'))
+        self.assertFalse(hook.check_for_prefix('bucket', prefix='a', delimiter='/'))
+
+    @mock_s3
+    def test_list_prefixes(self):
+        hook = S3Hook(aws_conn_id=None)
+        b = hook.get_bucket('bucket')
+        b.create()
+        b.put_object(Key='a', Body=b'a')
+        b.put_object(Key='dir/b', Body=b'b')
+
+        self.assertIsNone(hook.list_prefixes('bucket', prefix='non-existent/'))
+        self.assertListEqual(['dir/'], hook.list_prefixes('bucket', delimiter='/'))
+        self.assertListEqual(['a'], hook.list_keys('bucket', delimiter='/'))
+        self.assertListEqual(['dir/b'], hook.list_keys('bucket', prefix='dir/'))
+
+    @mock_s3
+    def test_list_keys(self):
+        hook = S3Hook(aws_conn_id=None)
+        b = hook.get_bucket('bucket')
+        b.create()
+        b.put_object(Key='a', Body=b'a')
+        b.put_object(Key='dir/b', Body=b'b')
+
+        self.assertIsNone(hook.list_keys('bucket', prefix='non-existent/'))
+        self.assertListEqual(['a', 'dir/b'], hook.list_keys('bucket'))
+        self.assertListEqual(['a'], hook.list_keys('bucket', delimiter='/'))
+        self.assertListEqual(['dir/b'], hook.list_keys('bucket', prefix='dir/'))
+
+    @mock_s3
+    def test_check_for_key(self):
+        hook = S3Hook(aws_conn_id=None)
+        b = hook.get_bucket('bucket')
+        b.create()
+        b.put_object(Key='a', Body=b'a')
+
+        self.assertTrue(hook.check_for_key('a', 'bucket'))
+        self.assertTrue(hook.check_for_key('s3://bucket//a'))
+        self.assertFalse(hook.check_for_key('b', 'bucket'))
+        self.assertFalse(hook.check_for_key('s3://bucket//b'))
+
+    @mock_s3
+    def test_get_key(self):
+        hook = S3Hook(aws_conn_id=None)
+        b = hook.get_bucket('bucket')
+        b.create()
+        b.put_object(Key='a', Body=b'a')
+
+        self.assertEqual(hook.get_key('a', 'bucket').key, 'a')
+        self.assertEqual(hook.get_key('s3://bucket/a').key, 'a')
 
     @mock_s3
     def test_read_key(self):
@@ -70,5 +131,59 @@ class TestS3Hook(unittest.TestCase):
         self.assertEqual(hook.read_key('my_key', 'mybucket'), u'Contént')
 
 
+    @mock_s3
+    def test_check_for_wildcard_key(self):
+        hook = S3Hook(aws_conn_id=None)
+        b = hook.get_bucket('bucket')
+        b.create()
+        b.put_object(Key='abc', Body=b'a')
+        b.put_object(Key='a/b', Body=b'a')
+
+        self.assertTrue(hook.check_for_wildcard_key('a*', 'bucket'))
+        self.assertTrue(hook.check_for_wildcard_key('s3://bucket//a*'))
+        self.assertTrue(hook.check_for_wildcard_key('abc', 'bucket'))
+        self.assertTrue(hook.check_for_wildcard_key('s3://bucket//abc'))
+        self.assertFalse(hook.check_for_wildcard_key('a', 'bucket'))
+        self.assertFalse(hook.check_for_wildcard_key('s3://bucket//a'))
+        self.assertFalse(hook.check_for_wildcard_key('b', 'bucket'))
+        self.assertFalse(hook.check_for_wildcard_key('s3://bucket//b'))
+
+    @mock_s3
+    def test_get_wildcard_key(self):
+        hook = S3Hook(aws_conn_id=None)
+        b = hook.get_bucket('bucket')
+        b.create()
+        b.put_object(Key='abc', Body=b'a')
+        b.put_object(Key='a/b', Body=b'a')
+
+        # The boto3 Class API is _odd_, and we can't do an isinstance check as
+        # each instance is a different class, so lets just check one property
+        # on S3.Object. Not great but...
+        self.assertEqual(hook.get_wildcard_key('a*', 'bucket').key, 'a/b')
+        self.assertEqual(hook.get_wildcard_key('s3://bucket/a*').key, 'a/b')
+        self.assertEqual(hook.get_wildcard_key('a*', 'bucket', delimiter='/').key, 'abc')
+        self.assertEqual(hook.get_wildcard_key('s3://bucket/a*', delimiter='/').key, 'abc')
+        self.assertEqual(hook.get_wildcard_key('abc', 'bucket', delimiter='/').key, 'abc')
+        self.assertEqual(hook.get_wildcard_key('s3://bucket/abc', delimiter='/').key, 'abc')
+
+        self.assertIsNone(hook.get_wildcard_key('a', 'bucket'))
+        self.assertIsNone(hook.get_wildcard_key('s3://bucket/a'))
+        self.assertIsNone(hook.get_wildcard_key('b', 'bucket'))
+        self.assertIsNone(hook.get_wildcard_key('s3://bucket/b'))
+
+    @mock_s3
+    def test_load_string(self):
+        hook = S3Hook(aws_conn_id=None)
+        conn = hook.get_conn()
+        # We need to create the bucket since this is all in Moto's 'virtual'
+        # AWS account
+        conn.create_bucket(Bucket="mybucket")
+
+        hook.load_string(u"Contént", "my_key", "mybucket")
+        body = boto3.resource('s3').Object('mybucket', 'my_key').get()['Body'].read()
+
+        self.assertEqual(body, b'Cont\xC3\xA9nt')
+
+
 if __name__ == '__main__':
     unittest.main()