You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/09/07 18:34:31 UTC
[airflow] branch main updated: fix(providers/alibaba): respect soft_fail argument when exception is raised (#34157)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7696e416ae fix(providers/alibaba): respect soft_fail argument when exception is raised (#34157)
7696e416ae is described below
commit 7696e416aef95295bdb360a8bf5ce68f9fd41e3e
Author: Wei Lee <we...@gmail.com>
AuthorDate: Fri Sep 8 02:34:24 2023 +0800
fix(providers/alibaba): respect soft_fail argument when exception is raised (#34157)
---
airflow/providers/alibaba/cloud/sensors/oss_key.py | 14 +++--
.../alibaba/cloud/sensors/test_oss_key.py | 62 +++++++++++++++-------
2 files changed, 54 insertions(+), 22 deletions(-)
diff --git a/airflow/providers/alibaba/cloud/sensors/oss_key.py b/airflow/providers/alibaba/cloud/sensors/oss_key.py
index f57df79fce..3b88eb65c6 100644
--- a/airflow/providers/alibaba/cloud/sensors/oss_key.py
+++ b/airflow/providers/alibaba/cloud/sensors/oss_key.py
@@ -21,7 +21,7 @@ from functools import cached_property
from typing import TYPE_CHECKING, Sequence
from urllib.parse import urlsplit
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
from airflow.sensors.base import BaseSensorOperator
@@ -72,16 +72,24 @@ class OSSKeySensor(BaseSensorOperator):
parsed_url = urlsplit(self.bucket_key)
if self.bucket_name is None:
if parsed_url.netloc == "":
- raise AirflowException("If key is a relative path from root, please provide a bucket_name")
+ # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
+ message = "If key is a relative path from root, please provide a bucket_name"
+ if self.soft_fail:
+ raise AirflowSkipException(message)
+ raise AirflowException(message)
self.bucket_name = parsed_url.netloc
self.bucket_key = parsed_url.path.lstrip("/")
else:
if parsed_url.scheme != "" or parsed_url.netloc != "":
- raise AirflowException(
+ # TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
+ message = (
"If bucket_name is provided, bucket_key"
" should be relative path from root"
" level, rather than a full oss:// url"
)
+ if self.soft_fail:
+ raise AirflowSkipException(message)
+ raise AirflowException(message)
self.log.info("Poking for key : oss://%s/%s", self.bucket_name, self.bucket_key)
return self.get_hook.object_exists(key=self.bucket_key, bucket_name=self.bucket_name)
diff --git a/tests/providers/alibaba/cloud/sensors/test_oss_key.py b/tests/providers/alibaba/cloud/sensors/test_oss_key.py
index 4304f37a52..388a57e50e 100644
--- a/tests/providers/alibaba/cloud/sensors/test_oss_key.py
+++ b/tests/providers/alibaba/cloud/sensors/test_oss_key.py
@@ -20,9 +20,13 @@ from __future__ import annotations
from unittest import mock
from unittest.mock import PropertyMock
+import pytest
+
+from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.alibaba.cloud.sensors.oss_key import OSSKeySensor
-OSS_SENSOR_STRING = "airflow.providers.alibaba.cloud.sensors.oss_key.{}"
+MODULE_NAME = "airflow.providers.alibaba.cloud.sensors.oss_key"
+
MOCK_TASK_ID = "test-oss-operator"
MOCK_REGION = "mock_region"
MOCK_BUCKET = "mock_bucket_name"
@@ -32,41 +36,61 @@ MOCK_KEYS = ["mock_key1", "mock_key_2", "mock_key3"]
MOCK_CONTENT = "mock_content"
+@pytest.fixture
+def oss_key_sensor():
+ return OSSKeySensor(
+ bucket_key=MOCK_KEY,
+ oss_conn_id=MOCK_OSS_CONN_ID,
+ region=MOCK_REGION,
+ bucket_name=MOCK_BUCKET,
+ task_id=MOCK_TASK_ID,
+ )
+
+
class TestOSSKeySensor:
- def setup_method(self):
- self.sensor = OSSKeySensor(
- bucket_key=MOCK_KEY,
- oss_conn_id=MOCK_OSS_CONN_ID,
- region=MOCK_REGION,
- bucket_name=MOCK_BUCKET,
- task_id=MOCK_TASK_ID,
- )
-
- @mock.patch(OSS_SENSOR_STRING.format("OSSHook"))
- def test_get_hook(self, mock_service):
- self.sensor.get_hook()
+ @mock.patch(f"{MODULE_NAME}.OSSHook")
+ def test_get_hook(self, mock_service, oss_key_sensor):
+ oss_key_sensor.get_hook()
mock_service.assert_called_once_with(oss_conn_id=MOCK_OSS_CONN_ID, region=MOCK_REGION)
- @mock.patch(OSS_SENSOR_STRING.format("OSSKeySensor.get_hook"), new_callable=PropertyMock)
- def test_poke_exsiting_key(self, mock_service):
+ @mock.patch(f"{MODULE_NAME}.OSSKeySensor.get_hook", new_callable=PropertyMock)
+ def test_poke_exsiting_key(self, mock_service, oss_key_sensor):
# Given
mock_service.return_value.object_exists.return_value = True
# When
- res = self.sensor.poke(None)
+ res = oss_key_sensor.poke(None)
# Then
assert res is True
mock_service.return_value.object_exists.assert_called_once_with(key=MOCK_KEY, bucket_name=MOCK_BUCKET)
- @mock.patch(OSS_SENSOR_STRING.format("OSSKeySensor.get_hook"), new_callable=PropertyMock)
- def test_poke_non_exsiting_key(self, mock_service):
+ @mock.patch(f"{MODULE_NAME}.OSSKeySensor.get_hook", new_callable=PropertyMock)
+ def test_poke_non_exsiting_key(self, mock_service, oss_key_sensor):
# Given
mock_service.return_value.object_exists.return_value = False
# When
- res = self.sensor.poke(None)
+ res = oss_key_sensor.poke(None)
# Then
assert res is False
mock_service.return_value.object_exists.assert_called_once_with(key=MOCK_KEY, bucket_name=MOCK_BUCKET)
+
+ @pytest.mark.parametrize(
+ "soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException))
+ )
+ @mock.patch(f"{MODULE_NAME}.OSSKeySensor.get_hook", new_callable=PropertyMock)
+ def test_poke_without_bucket_name(
+ self, mock_service, oss_key_sensor, soft_fail: bool, expected_exception: AirflowException
+ ):
+ # Given
+ oss_key_sensor.soft_fail = soft_fail
+ oss_key_sensor.bucket_name = None
+ mock_service.return_value.object_exists.return_value = False
+
+ # When, Then
+ with pytest.raises(
+ expected_exception, match="If key is a relative path from root, please provide a bucket_name"
+ ):
+ oss_key_sensor.poke(None)