You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/03/11 12:48:28 UTC

[airflow] branch main updated: Switch unit tests for oss operator and oss sensor in alibaba provider to use mocks (#17617) (#22178)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 03d0c70  Switch unit tests for oss operator and oss sensor in alibaba provider to use mocks (#17617) (#22178)
03d0c70 is described below

commit 03d0c702cf5bb72dcb129b86c219cbe59fd7548b
Author: Eric Gao <er...@gmail.com>
AuthorDate: Fri Mar 11 20:47:45 2022 +0800

    Switch unit tests for oss operator and oss sensor in alibaba provider to use mocks (#17617) (#22178)
---
 .../providers/alibaba/cloud/operators/test_oss.py  | 163 ++++++++++-----------
 .../alibaba/cloud/sensors/test_oss_key.py          | 102 ++++++-------
 2 files changed, 122 insertions(+), 143 deletions(-)

diff --git a/tests/providers/alibaba/cloud/operators/test_oss.py b/tests/providers/alibaba/cloud/operators/test_oss.py
index af4b2c5..ec556a5 100644
--- a/tests/providers/alibaba/cloud/operators/test_oss.py
+++ b/tests/providers/alibaba/cloud/operators/test_oss.py
@@ -16,13 +16,9 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import os
 import unittest
+from unittest import mock
 
-import oss2
-
-from airflow.exceptions import AirflowException
-from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
 from airflow.providers.alibaba.cloud.operators.oss import (
     OSSCreateBucketOperator,
     OSSDeleteBatchObjectOperator,
@@ -31,96 +27,99 @@ from airflow.providers.alibaba.cloud.operators.oss import (
     OSSDownloadObjectOperator,
     OSSUploadObjectOperator,
 )
-from tests.providers.alibaba.cloud.utils.test_utils import skip_test_if_no_valid_conn_id
 
-TEST_CONN_ID = os.environ.get('TEST_OSS_CONN_ID', 'oss_default')
-TEST_REGION = os.environ.get('TEST_OSS_REGION', 'us-east-1')
-TEST_BUCKET = os.environ.get('TEST_OSS_BUCKET', 'test-bucket')
-TEST_FILE_PATH = '/tmp/airflow-test'
+MOCK_TASK_ID = "test-oss-operator"
+MOCK_REGION = "mock_region"
+MOCK_BUCKET = "mock_bucket_name"
+MOCK_OSS_CONN_ID = "mock_oss_conn_default"
+MOCK_KEY = "mock_key"
+MOCK_KEYS = ["mock_key1", "mock_key_2", "mock_key3"]
+MOCK_CONTENT = "mock_content"
 
 
-class TestOSSOperator(unittest.TestCase):
-    def setUp(self):
-        self.create_bucket_operator = OSSCreateBucketOperator(
-            oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-1'
-        )
-        self.delete_bucket_operator = OSSDeleteBucketOperator(
-            oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-2'
+class TestOSSCreateBucketOperator(unittest.TestCase):
+    @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+    def test_execute(self, mock_hook):
+        operator = OSSCreateBucketOperator(
+            task_id=MOCK_TASK_ID, region=MOCK_REGION, bucket_name=MOCK_BUCKET, oss_conn_id=MOCK_OSS_CONN_ID
         )
-        try:
-            self.hook = OSSHook(region=TEST_REGION)
-            self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET)
-        except AirflowException:
-            self.hook = None
-        except oss2.exceptions.ServerError as e:
-            if e.status == 403:
-                self.hook = None
+        operator.execute(None)
+        mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+        mock_hook.return_value.create_bucket.assert_called_once_with(bucket_name=MOCK_BUCKET)
 
-    @skip_test_if_no_valid_conn_id
-    def test_init(self):
-        assert self.create_bucket_operator.oss_conn_id == TEST_CONN_ID
 
-    @skip_test_if_no_valid_conn_id
-    def test_create_delete_bucket(self):
-        self.create_bucket_operator.execute({})
-        self.delete_bucket_operator.execute({})
+class TestOSSDeleteBucketOperator(unittest.TestCase):
+    @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+    def test_execute(self, mock_hook):
+        operator = OSSDeleteBucketOperator(
+            task_id=MOCK_TASK_ID, region=MOCK_REGION, bucket_name=MOCK_BUCKET, oss_conn_id=MOCK_OSS_CONN_ID
+        )
+        operator.execute(None)
+        mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+        mock_hook.return_value.delete_bucket.assert_called_once_with(bucket_name=MOCK_BUCKET)
 
-    @skip_test_if_no_valid_conn_id
-    def test_object(self):
-        self.create_bucket_operator.execute({})
 
-        upload_file = f'{TEST_FILE_PATH}_upload_1'
-        if not os.path.exists(upload_file):
-            with open(upload_file, 'w') as f:
-                f.write('test')
-        upload_object_operator = OSSUploadObjectOperator(
-            key='obj',
-            file=upload_file,
-            oss_conn_id=TEST_CONN_ID,
-            region=TEST_REGION,
-            bucket_name=TEST_BUCKET,
-            task_id='task-1',
+class TestOSSUploadObjectOperator(unittest.TestCase):
+    @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+    def test_execute(self, mock_hook):
+        operator = OSSUploadObjectOperator(
+            task_id=MOCK_TASK_ID,
+            region=MOCK_REGION,
+            bucket_name=MOCK_BUCKET,
+            oss_conn_id=MOCK_OSS_CONN_ID,
+            key=MOCK_KEY,
+            file=MOCK_CONTENT,
         )
-        upload_object_operator.execute({})
-        assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET)
-
-        download_file = f'{TEST_FILE_PATH}_download_1'
-        download_object_operator = OSSDownloadObjectOperator(
-            key='obj',
-            file=download_file,
-            oss_conn_id=TEST_CONN_ID,
-            region=TEST_REGION,
-            bucket_name=TEST_BUCKET,
-            task_id='task-2',
+        operator.execute(None)
+        mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+        mock_hook.return_value.upload_local_file.assert_called_once_with(
+            bucket_name=MOCK_BUCKET, key=MOCK_KEY, file=MOCK_CONTENT
         )
-        download_object_operator.execute({})
-        assert os.path.exists(download_file)
 
-        delete_object_operator = OSSDeleteObjectOperator(
-            key='obj', oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-3'
-        )
-        delete_object_operator.execute({})
-        assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET) is False
 
-        upload_object_operator = OSSUploadObjectOperator(
-            key='obj',
-            file=upload_file,
-            oss_conn_id=TEST_CONN_ID,
-            region=TEST_REGION,
-            bucket_name=TEST_BUCKET,
-            task_id='task-4',
+class TestOSSDownloadObjectOperator(unittest.TestCase):
+    @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+    def test_execute(self, mock_hook):
+        operator = OSSDownloadObjectOperator(
+            task_id=MOCK_TASK_ID,
+            region=MOCK_REGION,
+            bucket_name=MOCK_BUCKET,
+            oss_conn_id=MOCK_OSS_CONN_ID,
+            key=MOCK_KEY,
+            file=MOCK_CONTENT,
         )
-        upload_object_operator.execute({})
-        assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET)
+        operator.execute(None)
+        mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+        mock_hook.return_value.download_file.assert_called_once_with(
+            bucket_name=MOCK_BUCKET, key=MOCK_KEY, local_file=MOCK_CONTENT
+        )
+
 
-        delete_objects_operator = OSSDeleteBatchObjectOperator(
-            keys=['obj'],
-            oss_conn_id=TEST_CONN_ID,
-            region=TEST_REGION,
-            bucket_name=TEST_BUCKET,
-            task_id='task-5',
+class TestOSSDeleteBatchObjectOperator(unittest.TestCase):
+    @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+    def test_execute(self, mock_hook):
+        operator = OSSDeleteBatchObjectOperator(
+            task_id=MOCK_TASK_ID,
+            region=MOCK_REGION,
+            bucket_name=MOCK_BUCKET,
+            oss_conn_id=MOCK_OSS_CONN_ID,
+            keys=MOCK_KEYS,
         )
-        delete_objects_operator.execute({})
-        assert self.hook.object_exists(key='obj', bucket_name=TEST_BUCKET) is False
+        operator.execute(None)
+        mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+        mock_hook.return_value.delete_objects.assert_called_once_with(bucket_name=MOCK_BUCKET, key=MOCK_KEYS)
 
-        self.delete_bucket_operator.execute({})
+
+class TestOSSDeleteObjectOperator(unittest.TestCase):
+    @mock.patch("airflow.providers.alibaba.cloud.operators.oss.OSSHook")
+    def test_execute(self, mock_hook):
+        operator = OSSDeleteObjectOperator(
+            task_id=MOCK_TASK_ID,
+            region=MOCK_REGION,
+            bucket_name=MOCK_BUCKET,
+            oss_conn_id=MOCK_OSS_CONN_ID,
+            key=MOCK_KEY,
+        )
+        operator.execute(None)
+        mock_hook.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
+        mock_hook.return_value.delete_object.assert_called_once_with(bucket_name=MOCK_BUCKET, key=MOCK_KEY)
diff --git a/tests/providers/alibaba/cloud/sensors/test_oss_key.py b/tests/providers/alibaba/cloud/sensors/test_oss_key.py
index 1273e6d..48e98d9 100644
--- a/tests/providers/alibaba/cloud/sensors/test_oss_key.py
+++ b/tests/providers/alibaba/cloud/sensors/test_oss_key.py
@@ -16,78 +16,58 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import os
-import unittest
 
-import oss2
+import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-from airflow.exceptions import AirflowException
-from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
-from airflow.providers.alibaba.cloud.operators.oss import (
-    OSSCreateBucketOperator,
-    OSSDeleteBucketOperator,
-    OSSDeleteObjectOperator,
-    OSSUploadObjectOperator,
-)
 from airflow.providers.alibaba.cloud.sensors.oss_key import OSSKeySensor
-from tests.providers.alibaba.cloud.utils.test_utils import skip_test_if_no_valid_conn_id
 
-TEST_CONN_ID = os.environ.get('TEST_OSS_CONN_ID', 'oss_default')
-TEST_REGION = os.environ.get('TEST_OSS_REGION', 'us-east-1')
-TEST_BUCKET = os.environ.get('TEST_OSS_BUCKET', 'test-bucket')
-TEST_FILE_PATH = '/tmp/airflow-test'
+OSS_SENSOR_STRING = 'airflow.providers.alibaba.cloud.sensors.oss_key.{}'
+MOCK_TASK_ID = "test-oss-operator"
+MOCK_REGION = "mock_region"
+MOCK_BUCKET = "mock_bucket_name"
+MOCK_OSS_CONN_ID = "mock_oss_conn_default"
+MOCK_KEY = "mock_key"
+MOCK_KEYS = ["mock_key1", "mock_key_2", "mock_key3"]
+MOCK_CONTENT = "mock_content"
 
 
-class TestOSSSensor(unittest.TestCase):
+class TestOSSKeySensor(unittest.TestCase):
     def setUp(self):
         self.sensor = OSSKeySensor(
-            bucket_key='obj',
-            oss_conn_id=TEST_CONN_ID,
-            region=TEST_REGION,
-            bucket_name=TEST_BUCKET,
-            task_id='task-1',
+            bucket_key=MOCK_KEY,
+            oss_conn_id=MOCK_OSS_CONN_ID,
+            region=MOCK_REGION,
+            bucket_name=MOCK_BUCKET,
+            task_id=MOCK_TASK_ID,
         )
-        try:
-            self.hook = OSSHook(region=TEST_REGION, oss_conn_id=TEST_CONN_ID)
-            self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET)
-        except AirflowException:
-            self.hook = None
-        except oss2.exceptions.ServerError as e:
-            if e.status == 403:
-                self.hook = None
 
-    @skip_test_if_no_valid_conn_id
-    def test_init(self):
-        assert self.sensor.oss_conn_id == TEST_CONN_ID
+    @mock.patch(OSS_SENSOR_STRING.format("OSSHook"))
+    def test_get_hook(self, mock_service):
+        self.sensor.get_hook()
+        mock_service.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
 
-    @skip_test_if_no_valid_conn_id
-    def test_poke(self):
-        create_bucket_operator = OSSCreateBucketOperator(
-            oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-2'
-        )
-        create_bucket_operator.execute({})
+    @mock.patch(OSS_SENSOR_STRING.format("OSSKeySensor.get_hook"), new_callable=PropertyMock)
+    def test_poke_exsiting_key(self, mock_service):
+        # Given
+        mock_service.return_value.object_exists.return_value = True
 
-        upload_file = f'{TEST_FILE_PATH}_upload_1'
-        if not os.path.exists(upload_file):
-            with open(upload_file, 'w') as f:
-                f.write('test')
-        upload_object_operator = OSSUploadObjectOperator(
-            key='obj',
-            file=upload_file,
-            oss_conn_id=TEST_CONN_ID,
-            region=TEST_REGION,
-            bucket_name=TEST_BUCKET,
-            task_id='task-3',
-        )
-        upload_object_operator.execute({})
-        assert self.sensor.poke({})
+        # When
+        res = self.sensor.poke(None)
 
-        delete_object_operator = OSSDeleteObjectOperator(
-            key='obj', oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-4'
-        )
-        delete_object_operator.execute({})
+        # Then
+        assert res is True
+        mock_service.return_value.object_exists.assert_called_once_with(key=MOCK_KEY, bucket_name=MOCK_BUCKET)
 
-        delete_bucket_operator = OSSDeleteBucketOperator(
-            oss_conn_id=TEST_CONN_ID, region=TEST_REGION, bucket_name=TEST_BUCKET, task_id='task-5'
-        )
-        delete_bucket_operator.execute({})
+    @mock.patch(OSS_SENSOR_STRING.format("OSSKeySensor.get_hook"), new_callable=PropertyMock)
+    def test_poke_non_exsiting_key(self, mock_service):
+        # Given
+        mock_service.return_value.object_exists.return_value = False
+
+        # When
+        res = self.sensor.poke(None)
+
+        # Then
+        assert res is False
+        mock_service.return_value.object_exists.assert_called_once_with(key=MOCK_KEY, bucket_name=MOCK_BUCKET)