You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/03/11 12:48:28 UTC
[airflow] branch main updated: Switch unit tests for oss operator and oss sensor in alibaba provider to use mocks (#17617) (#22178)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 03d0c70 Switch unit tests for oss operator and oss sensor in alibaba provider to use mocks (#17617) (#22178)
03d0c70 is described below
commit 03d0c702cf5bb72dcb129b86c219cbe59fd7548b
Author: Eric Gao <er...@gmail.com>
AuthorDate: Fri Mar 11 20:47:45 2022 +0800
Switch unit tests for oss operator and oss sensor in alibaba provider to use mocks (#17617) (#22178)
---
.../providers/alibaba/cloud/operators/test_oss.py | 163 ++++++++++-----------
.../alibaba/cloud/sensors/test_oss_key.py | 102 ++++++-------
2 files changed, 122 insertions(+), 143 deletions(-)
diff --git a/tests/providers/alibaba/cloud/operators/test_oss.py b/tests/providers/alibaba/cloud/operators/test_oss.py
index af4b2c5..ec556a5 100644
--- a/tests/providers/alibaba/cloud/operators/test_oss.py
+++ b/tests/providers/alibaba/cloud/operators/test_oss.py
@@ -16,13 +16,9 @@
# specific language governing permissions and limitations
# under the License.
#
-import os
import unittest
+from unittest import mock
-import oss2
-
-from airflow.exceptions import AirflowException
-from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
from airflow.providers.alibaba.cloud.operators.oss import (
OSSCreateBucketOperator,
OSSDeleteBatchObjectOperator,
@@ -31,96 +27,99 @@ from airflow.providers.alibaba.cloud.operators.oss import (
OSSDownloadObjectOperator,
OSSUploadObjectOperator,
)
-from tests.providers.alibaba.cloud.utils.test_utils import skip_test_if_no_valid_conn_id
-TEST_CONN_ID = os.environ.get('TEST_OSS_CONN_ID', 'oss_default')
-TEST_REGION = os.environ.get('TEST_OSS_REGION', 'us-east-1')
-TEST_BUCKET = os.environ.get('TEST_OSS_BUCKET', 'test-bucket')
-TEST_FILE_PATH = '/tmp/airflow-test'
+MOCK_TASK_ID = "test-oss-operator"
+MOCK_REGION = "mock_region"
+MOCK_BUCKET = "mock_bucket_name"
+MOCK_OSS_CONN_ID = "mock_oss_conn_default"
+MOCK_KEY = "mock_key"
+MOCK_KEYS = ["mock_key1", "mock_key_2", "mock_key3"]
+MOCK_CONTENT = "mock_content"
-class TestOSSOperator(unittest.TestCase):
- def setUp(self):
- self.create_bucket_operator = OSSCreateBucketOperator(
- oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-1'
- )
- self.delete_bucket_operator = OSSDeleteBucketOperator(
- oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-2'
+class TestOSSCreateBucketOperator(unittest.TestCase):
+ @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+ def test_execute(self, mock_hook):
+ operator = OSSCreateBucketOperator(
+ task_id=MOCK_TASK_ID, region=MOCK_REGION, bucket_name=MOCK_BUCKET, oss_conn_id=MOCK_OSS_CONN_ID
)
- try:
- self.hook = OSSHook(region=TEST_REGION)
- self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET)
- except AirflowException:
- self.hook = None
- except oss2.exceptions.ServerError as e:
- if e.status == 403:
- self.hook = None
+ operator.execute(None)
+ mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+ mock_hook.return_value.create_bucket.assert_called_once_with(bucket_name=MOCK_BUCKET)
- @skip_test_if_no_valid_conn_id
- def test_init(self):
- assert self.create_bucket_operator.oss_conn_id == TEST_CONN_ID
- @skip_test_if_no_valid_conn_id
- def test_create_delete_bucket(self):
- self.create_bucket_operator.execute({})
- self.delete_bucket_operator.execute({})
+class TestOSSDeleteBucketOperator(unittest.TestCase):
+ @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+ def test_execute(self, mock_hook):
+ operator = OSSDeleteBucketOperator(
+ task_id=MOCK_TASK_ID, region=MOCK_REGION, bucket_name=MOCK_BUCKET, oss_conn_id=MOCK_OSS_CONN_ID
+ )
+ operator.execute(None)
+ mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+ mock_hook.return_value.delete_bucket.assert_called_once_with(bucket_name=MOCK_BUCKET)
- @skip_test_if_no_valid_conn_id
- def test_object(self):
- self.create_bucket_operator.execute({})
- upload_file = f'{TEST_FILE_PATH}_upload_1'
- if not os.path.exists(upload_file):
- with open(upload_file, 'w') as f:
- f.write('test')
- upload_object_operator = OSSUploadObjectOperator(
- key='obj',
- file=upload_file,
- oss_conn_id=TEST_CONN_ID,
- region=TEST_REGION,
- bucket_name=TEST_BUCKET,
- task_id='task-1',
+class TestOSSUploadObjectOperator(unittest.TestCase):
+ @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+ def test_execute(self, mock_hook):
+ operator = OSSUploadObjectOperator(
+ task_id=MOCK_TASK_ID,
+ region=MOCK_REGION,
+ bucket_name=MOCK_BUCKET,
+ oss_conn_id=MOCK_OSS_CONN_ID,
+ key=MOCK_KEY,
+ file=MOCK_CONTENT,
)
- upload_object_operator.execute({})
- assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET)
-
- download_file = f'{TEST_FILE_PATH}_download_1'
- download_object_operator = OSSDownloadObjectOperator(
- key='obj',
- file=download_file,
- oss_conn_id=TEST_CONN_ID,
- region=TEST_REGION,
- bucket_name=TEST_BUCKET,
- task_id='task-2',
+ operator.execute(None)
+ mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+ mock_hook.return_value.upload_local_file.assert_called_once_with(
+ bucket_name=MOCK_BUCKET, key=MOCK_KEY, file=MOCK_CONTENT
)
- download_object_operator.execute({})
- assert os.path.exists(download_file)
- delete_object_operator = OSSDeleteObjectOperator(
- key='obj', oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-3'
- )
- delete_object_operator.execute({})
- assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET) is False
- upload_object_operator = OSSUploadObjectOperator(
- key='obj',
- file=upload_file,
- oss_conn_id=TEST_CONN_ID,
- region=TEST_REGION,
- bucket_name=TEST_BUCKET,
- task_id='task-4',
+class TestOSSDownloadObjectOperator(unittest.TestCase):
+ @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+ def test_execute(self, mock_hook):
+ operator = OSSDownloadObjectOperator(
+ task_id=MOCK_TASK_ID,
+ region=MOCK_REGION,
+ bucket_name=MOCK_BUCKET,
+ oss_conn_id=MOCK_OSS_CONN_ID,
+ key=MOCK_KEY,
+ file=MOCK_CONTENT,
)
- upload_object_operator.execute({})
- assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET)
+ operator.execute(None)
+ mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+ mock_hook.return_value.download_file.assert_called_once_with(
+ bucket_name=MOCK_BUCKET, key=MOCK_KEY, local_file=MOCK_CONTENT
+ )
+
- delete_objects_operator = OSSDeleteBatchObjectOperator(
- keys=['obj'],
- oss_conn_id=TEST_CONN_ID,
- region=TEST_REGION,
- bucket_name=TEST_BUCKET,
- task_id='task-5',
+class TestOSSDeleteBatchObjectOperator(unittest.TestCase):
+ @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+ def test_execute(self, mock_hook):
+ operator = OSSDeleteBatchObjectOperator(
+ task_id=MOCK_TASK_ID,
+ region=MOCK_REGION,
+ bucket_name=MOCK_BUCKET,
+ oss_conn_id=MOCK_OSS_CONN_ID,
+ keys=MOCK_KEYS,
)
- delete_objects_operator.execute({})
- assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET) is False
+ operator.execute(None)
+ mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+ mock_hook.return_value.delete_objects.assert_called_once_with(bucket_name=MOCK_BUCKET, key=MOCK_KEYS)
- self.delete_bucket_operator.execute({})
+
+class TestOSSDeleteObjectOperator(unittest.TestCase):
+ @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+ def test_execute(self, mock_hook):
+ operator = OSSDeleteObjectOperator(
+ task_id=MOCK_TASK_ID,
+ region=MOCK_REGION,
+ bucket_name=MOCK_BUCKET,
+ oss_conn_id=MOCK_OSS_CONN_ID,
+ key=MOCK_KEY,
+ )
+ operator.execute(None)
+ mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+ mock_hook.return_value.delete_object.assert_called_once_with(bucket_name=MOCK_BUCKET, key=MOCK_KEY)
diff --git a/tests/providers/alibaba/cloud/sensors/test_oss_key.py b/tests/providers/alibaba/cloud/sensors/test_oss_key.py
index 1273e6d..48e98d9 100644
--- a/tests/providers/alibaba/cloud/sensors/test_oss_key.py
+++ b/tests/providers/alibaba/cloud/sensors/test_oss_key.py
@@ -16,78 +16,58 @@
# specific language governing permissions and limitations
# under the License.
#
-import os
-import unittest
-import oss2
+import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
-from airflow.exceptions import AirflowException
-from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
-from airflow.providers.alibaba.cloud.operators.oss import (
- OSSCreateBucketOperator,
- OSSDeleteBucketOperator,
- OSSDeleteObjectOperator,
- OSSUploadObjectOperator,
-)
from airflow.providers.alibaba.cloud.sensors.oss_key import OSSKeySensor
-from tests.providers.alibaba.cloud.utils.test_utils import skip_test_if_no_valid_conn_id
-TEST_CONN_ID = os.environ.get('TEST_OSS_CONN_ID', 'oss_default')
-TEST_REGION = os.environ.get('TEST_OSS_REGION', 'us-east-1')
-TEST_BUCKET = os.environ.get('TEST_OSS_BUCKET', 'test-bucket')
-TEST_FILE_PATH = '/tmp/airflow-test'
+OSS_SENSOR_STRING = 'airflow.providers.alibaba.cloud.sensors.oss_key.{}'
+MOCK_TASK_ID = "test-oss-operator"
+MOCK_REGION = "mock_region"
+MOCK_BUCKET = "mock_bucket_name"
+MOCK_OSS_CONN_ID = "mock_oss_conn_default"
+MOCK_KEY = "mock_key"
+MOCK_KEYS = ["mock_key1", "mock_key_2", "mock_key3"]
+MOCK_CONTENT = "mock_content"
-class TestOSSSensor(unittest.TestCase):
+class TestOSSKeySensor(unittest.TestCase):
def setUp(self):
self.sensor = OSSKeySensor(
- bucket_key='obj',
- oss_conn_id=TEST_CONN_ID,
- region=TEST_REGION,
- bucket_name=TEST_BUCKET,
- task_id='task-1',
+ bucket_key=MOCK_KEY,
+ oss_conn_id=MOCK_OSS_CONN_ID,
+ region=MOCK_REGION,
+ bucket_name=MOCK_BUCKET,
+ task_id=MOCK_TASK_ID,
)
- try:
- self.hook = OSSHook(region=TEST_REGION, oss_conn_id=TEST_CONN_ID)
- self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET)
- except AirflowException:
- self.hook = None
- except oss2.exceptions.ServerError as e:
- if e.status == 403:
- self.hook = None
- @skip_test_if_no_valid_conn_id
- def test_init(self):
- assert self.sensor.oss_conn_id == TEST_CONN_ID
+ @mock.patch(OSS_SENSOR_STRING.format("OSSHook"))
+ def test_get_hook(self, mock_service):
+ self.sensor.get_hook()
+ mock_service.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
- @skip_test_if_no_valid_conn_id
- def test_poke(self):
- create_bucket_operator = OSSCreateBucketOperator(
- oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-2'
- )
- create_bucket_operator.execute({})
+ @mock.patch(OSS_SENSOR_STRING.format("OSSKeySensor.get_hook"), new_callable=PropertyMock)
+ def test_poke_exsiting_key(self, mock_service):
+ # Given
+ mock_service.return_value.object_exists.return_value = True
- upload_file = f'{TEST_FILE_PATH}_upload_1'
- if not os.path.exists(upload_file):
- with open(upload_file, 'w') as f:
- f.write('test')
- upload_object_operator = OSSUploadObjectOperator(
- key='obj',
- file=upload_file,
- oss_conn_id=TEST_CONN_ID,
- region=TEST_REGION,
- bucket_name=TEST_BUCKET,
- task_id='task-3',
- )
- upload_object_operator.execute({})
- assert self.sensor.poke({})
+ # When
+ res = self.sensor.poke(None)
- delete_object_operator = OSSDeleteObjectOperator(
- key='obj', oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-4'
- )
- delete_object_operator.execute({})
+ # Then
+ assert res is True
+ mock_service.return_value.object_exists.assert_called_once_with(key=MOCK_KEY, bucket_name=MOCK_BUCKET)
- delete_bucket_operator = OSSDeleteBucketOperator(
- oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-5'
- )
- delete_bucket_operator.execute({})
+ @mock.patch(OSS_SENSOR_STRING.format("OSSKeySensor.get_hook"), new_callable=PropertyMock)
+ def test_poke_non_exsiting_key(self, mock_service):
+ # Given
+ mock_service.return_value.object_exists.return_value = False
+
+ # When
+ res = self.sensor.poke(None)
+
+ # Then
+ assert res is False
+ mock_service.return_value.object_exists.assert_called_once_with(key=MOCK_KEY, bucket_name=MOCK_BUCKET)