You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/12/04 17:36:46 UTC
[airflow] branch main updated: Add retry param in GCSObjectExistenceSensor (#27943)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5cdff50557 Add retry param in GCSObjectExistenceSensor (#27943)
5cdff50557 is described below
commit 5cdff505574822ad3d2a226056246500e4adea2f
Author: Pankaj Singh <98...@users.noreply.github.com>
AuthorDate: Sun Dec 4 23:06:39 2022 +0530
Add retry param in GCSObjectExistenceSensor (#27943)
---
airflow/providers/google/cloud/hooks/gcs.py | 7 +++++--
airflow/providers/google/cloud/sensors/gcs.py | 8 +++++++-
tests/providers/google/cloud/hooks/test_gcs.py | 3 ++-
tests/providers/google/cloud/sensors/test_gcs.py | 3 ++-
4 files changed, 16 insertions(+), 5 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py
index 7e2e081a46..68e1c74986 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -33,10 +33,12 @@ from typing import IO, Callable, Generator, Sequence, TypeVar, cast, overload
from urllib.parse import urlsplit
from google.api_core.exceptions import NotFound
+from google.api_core.retry import Retry
# not sure why but mypy complains on missing `storage` but it is clearly there and is importable
from google.cloud import storage # type: ignore[attr-defined]
from google.cloud.exceptions import GoogleCloudError
+from google.cloud.storage.retry import DEFAULT_RETRY
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.utils.helpers import normalize_directory_path
@@ -533,18 +535,19 @@ class GCSHook(GoogleBaseHook):
else:
raise ValueError("'filename' and 'data' parameter missing. One is required to upload to gcs.")
- def exists(self, bucket_name: str, object_name: str) -> bool:
+ def exists(self, bucket_name: str, object_name: str, retry: Retry = DEFAULT_RETRY) -> bool:
"""
Checks for the existence of a file in Google Cloud Storage.
:param bucket_name: The Google Cloud Storage bucket where the object is.
:param object_name: The name of the blob_name to check in the Google cloud
storage bucket.
+ :param retry: (Optional) How to retry the RPC
"""
client = self.get_conn()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name=object_name)
- return blob.exists()
+ return blob.exists(retry=retry)
def get_blob_update_time(self, bucket_name: str, object_name: str):
"""
diff --git a/airflow/providers/google/cloud/sensors/gcs.py b/airflow/providers/google/cloud/sensors/gcs.py
index 27c26abac0..8264a09a7b 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -23,6 +23,9 @@ import textwrap
from datetime import datetime
from typing import TYPE_CHECKING, Callable, Sequence
+from google.api_core.retry import Retry
+from google.cloud.storage.retry import DEFAULT_RETRY
+
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.sensors.base import BaseSensorOperator, poke_mode_only
@@ -51,6 +54,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
+ :param retry: (Optional) How to retry the RPC
"""
template_fields: Sequence[str] = (
@@ -68,6 +72,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
google_cloud_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
+ retry: Retry = DEFAULT_RETRY,
**kwargs,
) -> None:
@@ -77,6 +82,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
self.google_cloud_conn_id = google_cloud_conn_id
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
+ self.retry = retry
def poke(self, context: Context) -> bool:
self.log.info("Sensor checks existence of : %s, %s", self.bucket, self.object)
@@ -85,7 +91,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
- return hook.exists(self.bucket, self.object)
+ return hook.exists(self.bucket, self.object, self.retry)
def ts_function(context):
diff --git a/tests/providers/google/cloud/hooks/test_gcs.py b/tests/providers/google/cloud/hooks/test_gcs.py
index b4ae41d4a3..e339385617 100644
--- a/tests/providers/google/cloud/hooks/test_gcs.py
+++ b/tests/providers/google/cloud/hooks/test_gcs.py
@@ -32,6 +32,7 @@ import pytest
# dynamic storage type in google.cloud needs to be type-ignored
from google.cloud import exceptions, storage # type: ignore[attr-defined]
+from google.cloud.storage.retry import DEFAULT_RETRY
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks import gcs
@@ -156,7 +157,7 @@ class TestGCSHook(unittest.TestCase):
assert response
bucket_mock.assert_called_once_with(test_bucket)
blob_object.assert_called_once_with(blob_name=test_object)
- exists_method.assert_called_once_with()
+ exists_method.assert_called_once_with(retry=DEFAULT_RETRY)
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_exists_nonexisting_object(self, mock_service):
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py b/tests/providers/google/cloud/sensors/test_gcs.py
index ca4b0a80a2..bc242ce509 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -22,6 +22,7 @@ from unittest import TestCase, mock
import pendulum
import pytest
+from google.cloud.storage.retry import DEFAULT_RETRY
from airflow.exceptions import AirflowSensorTimeout
from airflow.models.dag import DAG, AirflowException
@@ -84,7 +85,7 @@ class TestGoogleCloudStorageObjectSensor(TestCase):
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
- mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET, TEST_OBJECT)
+ mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET, TEST_OBJECT, DEFAULT_RETRY)
class TestTsFunction(TestCase):