You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/11/22 14:03:26 UTC
incubator-airflow git commit: [AIRFLOW-1839] Fix more bugs in S3Hook
boto -> boto3 migration
Repository: incubator-airflow
Updated Branches:
refs/heads/master 6b1ceff7d -> 2d5408935
[AIRFLOW-1839] Fix more bugs in S3Hook boto -> boto3 migration
There were some more bugs as a result of the boto
to boto3 migration
that weren't covered by existing tests. Now they
are fixed, and covered.
Hopefully I got everything this time.
Closes #2805 from ashb/AIRFLOW-1839-s3
-hook_loadsa-tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d540893
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d540893
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d540893
Branch: refs/heads/master
Commit: 2d5408935fc41c3d3b6618d8c563d1eecac06561
Parents: 6b1ceff
Author: Ash Berlin-Taylor <as...@firemirror.com>
Authored: Wed Nov 22 15:03:15 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Nov 22 15:03:15 2017 +0100
----------------------------------------------------------------------
airflow/hooks/S3_hook.py | 22 +++----
tests/hooks/test_s3_hook.py | 131 ++++++++++++++++++++++++++++++++++++---
2 files changed, 134 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d540893/airflow/hooks/S3_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index 226b520..717ba33 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -58,7 +58,7 @@ class S3Hook(AwsHook):
:param bucket_name: the name of the bucket
:type bucket_name: str
"""
- s3 = self.get_resource('s3')
+ s3 = self.get_resource_type('s3')
return s3.Bucket(bucket_name)
def check_for_prefix(self, bucket_name, prefix, delimiter):
@@ -69,7 +69,7 @@ class S3Hook(AwsHook):
prefix_split = re.split(r'(\w+[{d}])$'.format(d=delimiter), prefix, 1)
previous_level = prefix_split[0]
plist = self.list_prefixes(bucket_name, previous_level, delimiter)
- return False if plist is None else prefix in plist
+ return False if plist is None else prefix in plist
def list_prefixes(self, bucket_name, prefix='', delimiter=''):
"""
@@ -85,7 +85,7 @@ class S3Hook(AwsHook):
response = self.get_conn().list_objects_v2(Bucket=bucket_name,
Prefix=prefix,
Delimiter=delimiter)
- return [p.Prefix for p in response['CommonPrefixes']] if response.get('CommonPrefixes') else None
+ return [p['Prefix'] for p in response['CommonPrefixes']] if response.get('CommonPrefixes') else None
def list_keys(self, bucket_name, prefix='', delimiter=''):
"""
@@ -98,10 +98,10 @@ class S3Hook(AwsHook):
:param delimiter: the delimiter marks key hierarchy.
:type delimiter: str
"""
- response = self.get_conn().list_objects_v2(Bucket=bucket_name,
- Prefix=prefix,
+ response = self.get_conn().list_objects_v2(Bucket=bucket_name,
+ Prefix=prefix,
Delimiter=delimiter)
- return [k.Key for k in response['Contents']] if response.get('Contents') else None
+ return [k['Key'] for k in response['Contents']] if response.get('Contents') else None
def check_for_key(self, key, bucket_name=None):
"""
@@ -114,7 +114,7 @@ class S3Hook(AwsHook):
"""
if not bucket_name:
(bucket_name, key) = self.parse_s3_url(key)
-
+
try:
self.get_conn().head_object(Bucket=bucket_name, Key=key)
return True
@@ -170,7 +170,7 @@ class S3Hook(AwsHook):
"""
if not bucket_name:
(bucket_name, wildcard_key) = self.parse_s3_url(wildcard_key)
-
+
prefix = re.split(r'[*]', wildcard_key, 1)[0]
klist = self.list_keys(bucket_name, prefix=prefix, delimiter=delimiter)
if klist:
@@ -203,14 +203,14 @@ class S3Hook(AwsHook):
"""
if not bucket_name:
(bucket_name, key) = self.parse_s3_url(key)
-
+
if not replace and self.check_for_key(key, bucket_name):
raise ValueError("The key {key} already exists.".format(key=key))
-
+
extra_args={}
if encrypt:
extra_args['ServerSideEncryption'] = "AES256"
-
+
client = self.get_conn()
client.upload_file(filename, bucket_name, key, ExtraArgs=extra_args)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d540893/tests/hooks/test_s3_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_s3_hook.py b/tests/hooks/test_s3_hook.py
index 48c9fde..d392ea0 100644
--- a/tests/hooks/test_s3_hook.py
+++ b/tests/hooks/test_s3_hook.py
@@ -35,6 +35,7 @@ except ImportError:
@unittest.skipIf(mock_s3 is None,
"Skipping test because moto.mock_s3 is not available")
class TestS3Hook(unittest.TestCase):
+
def setUp(self):
configuration.load_test_config()
self.s3_test_url = "s3://test/this/is/not/a-real-key.txt"
@@ -44,19 +45,79 @@ class TestS3Hook(unittest.TestCase):
self.assertEqual(parsed,
("test", "this/is/not/a-real-key.txt"),
"Incorrect parsing of the s3 url")
+ @mock_s3
+ def test_check_for_bucket(self):
+ hook = S3Hook(aws_conn_id=None)
+ b = hook.get_bucket('bucket')
+ b.create()
+
+ self.assertTrue(hook.check_for_bucket('bucket'))
+ self.assertFalse(hook.check_for_bucket('not-a-bucket'))
@mock_s3
- def test_load_string(self):
+ def test_get_bucket(self):
hook = S3Hook(aws_conn_id=None)
- conn = hook.get_conn()
- # We need to create the bucket since this is all in Moto's 'virtual'
- # AWS account
- conn.create_bucket(Bucket="mybucket")
+ b = hook.get_bucket('bucket')
+ self.assertIsNotNone(b)
- hook.load_string(u"Contént", "my_key", "mybucket")
- body = boto3.resource('s3').Object('mybucket', 'my_key').get()['Body'].read()
+ @mock_s3
+ def test_check_for_prefix(self):
+ hook = S3Hook(aws_conn_id=None)
+ b = hook.get_bucket('bucket')
+ b.create()
+ b.put_object(Key='a', Body=b'a')
+ b.put_object(Key='dir/b', Body=b'b')
- self.assertEqual(body, b'Cont\xC3\xA9nt')
+ self.assertTrue(hook.check_for_prefix('bucket', prefix='dir/', delimiter='/'))
+ self.assertFalse(hook.check_for_prefix('bucket', prefix='a', delimiter='/'))
+
+ @mock_s3
+ def test_list_prefixes(self):
+ hook = S3Hook(aws_conn_id=None)
+ b = hook.get_bucket('bucket')
+ b.create()
+ b.put_object(Key='a', Body=b'a')
+ b.put_object(Key='dir/b', Body=b'b')
+
+ self.assertIsNone(hook.list_prefixes('bucket', prefix='non-existent/'))
+ self.assertListEqual(['dir/'], hook.list_prefixes('bucket', delimiter='/'))
+ self.assertListEqual(['a'], hook.list_keys('bucket', delimiter='/'))
+ self.assertListEqual(['dir/b'], hook.list_keys('bucket', prefix='dir/'))
+
+ @mock_s3
+ def test_list_keys(self):
+ hook = S3Hook(aws_conn_id=None)
+ b = hook.get_bucket('bucket')
+ b.create()
+ b.put_object(Key='a', Body=b'a')
+ b.put_object(Key='dir/b', Body=b'b')
+
+ self.assertIsNone(hook.list_keys('bucket', prefix='non-existent/'))
+ self.assertListEqual(['a', 'dir/b'], hook.list_keys('bucket'))
+ self.assertListEqual(['a'], hook.list_keys('bucket', delimiter='/'))
+ self.assertListEqual(['dir/b'], hook.list_keys('bucket', prefix='dir/'))
+
+ @mock_s3
+ def test_check_for_key(self):
+ hook = S3Hook(aws_conn_id=None)
+ b = hook.get_bucket('bucket')
+ b.create()
+ b.put_object(Key='a', Body=b'a')
+
+ self.assertTrue(hook.check_for_key('a', 'bucket'))
+ self.assertTrue(hook.check_for_key('s3://bucket//a'))
+ self.assertFalse(hook.check_for_key('b', 'bucket'))
+ self.assertFalse(hook.check_for_key('s3://bucket//b'))
+
+ @mock_s3
+ def test_get_key(self):
+ hook = S3Hook(aws_conn_id=None)
+ b = hook.get_bucket('bucket')
+ b.create()
+ b.put_object(Key='a', Body=b'a')
+
+ self.assertEqual(hook.get_key('a', 'bucket').key, 'a')
+ self.assertEqual(hook.get_key('s3://bucket/a').key, 'a')
@mock_s3
def test_read_key(self):
@@ -70,5 +131,59 @@ class TestS3Hook(unittest.TestCase):
self.assertEqual(hook.read_key('my_key', 'mybucket'), u'Contént')
+ @mock_s3
+ def test_check_for_wildcard_key(self):
+ hook = S3Hook(aws_conn_id=None)
+ b = hook.get_bucket('bucket')
+ b.create()
+ b.put_object(Key='abc', Body=b'a')
+ b.put_object(Key='a/b', Body=b'a')
+
+ self.assertTrue(hook.check_for_wildcard_key('a*', 'bucket'))
+ self.assertTrue(hook.check_for_wildcard_key('s3://bucket//a*'))
+ self.assertTrue(hook.check_for_wildcard_key('abc', 'bucket'))
+ self.assertTrue(hook.check_for_wildcard_key('s3://bucket//abc'))
+ self.assertFalse(hook.check_for_wildcard_key('a', 'bucket'))
+ self.assertFalse(hook.check_for_wildcard_key('s3://bucket//a'))
+ self.assertFalse(hook.check_for_wildcard_key('b', 'bucket'))
+ self.assertFalse(hook.check_for_wildcard_key('s3://bucket//b'))
+
+ @mock_s3
+ def test_get_wildcard_key(self):
+ hook = S3Hook(aws_conn_id=None)
+ b = hook.get_bucket('bucket')
+ b.create()
+ b.put_object(Key='abc', Body=b'a')
+ b.put_object(Key='a/b', Body=b'a')
+
+ # The boto3 Class API is _odd_, and we can't do an isinstance check as
+ # each instance is a different class, so lets just check one property
+ # on S3.Object. Not great but...
+ self.assertEqual(hook.get_wildcard_key('a*', 'bucket').key, 'a/b')
+ self.assertEqual(hook.get_wildcard_key('s3://bucket/a*').key, 'a/b')
+ self.assertEqual(hook.get_wildcard_key('a*', 'bucket', delimiter='/').key, 'abc')
+ self.assertEqual(hook.get_wildcard_key('s3://bucket/a*', delimiter='/').key, 'abc')
+ self.assertEqual(hook.get_wildcard_key('abc', 'bucket', delimiter='/').key, 'abc')
+ self.assertEqual(hook.get_wildcard_key('s3://bucket/abc', delimiter='/').key, 'abc')
+
+ self.assertIsNone(hook.get_wildcard_key('a', 'bucket'))
+ self.assertIsNone(hook.get_wildcard_key('s3://bucket/a'))
+ self.assertIsNone(hook.get_wildcard_key('b', 'bucket'))
+ self.assertIsNone(hook.get_wildcard_key('s3://bucket/b'))
+
+ @mock_s3
+ def test_load_string(self):
+ hook = S3Hook(aws_conn_id=None)
+ conn = hook.get_conn()
+ # We need to create the bucket since this is all in Moto's 'virtual'
+ # AWS account
+ conn.create_bucket(Bucket="mybucket")
+
+ hook.load_string(u"Contént", "my_key", "mybucket")
+ body = boto3.resource('s3').Object('mybucket', 'my_key').get()['Body'].read()
+
+ self.assertEqual(body, b'Cont\xC3\xA9nt')
+
+
if __name__ == '__main__':
unittest.main()