You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/12/04 17:36:46 UTC

[airflow] branch main updated: Add retry param in GCSObjectExistenceSensor (#27943)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cdff50557 Add retry param in GCSObjectExistenceSensor (#27943)
5cdff50557 is described below

commit 5cdff505574822ad3d2a226056246500e4adea2f
Author: Pankaj Singh <98...@users.noreply.github.com>
AuthorDate: Sun Dec 4 23:06:39 2022 +0530

    Add retry param in GCSObjectExistenceSensor (#27943)
---
 airflow/providers/google/cloud/hooks/gcs.py      | 7 +++++--
 airflow/providers/google/cloud/sensors/gcs.py    | 8 +++++++-
 tests/providers/google/cloud/hooks/test_gcs.py   | 3 ++-
 tests/providers/google/cloud/sensors/test_gcs.py | 3 ++-
 4 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py
index 7e2e081a46..68e1c74986 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -33,10 +33,12 @@ from typing import IO, Callable, Generator, Sequence, TypeVar, cast, overload
 from urllib.parse import urlsplit
 
 from google.api_core.exceptions import NotFound
+from google.api_core.retry import Retry
 
 # not sure why but mypy complains on missing `storage` but it is clearly there and is importable
 from google.cloud import storage  # type: ignore[attr-defined]
 from google.cloud.exceptions import GoogleCloudError
+from google.cloud.storage.retry import DEFAULT_RETRY
 
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.utils.helpers import normalize_directory_path
@@ -533,18 +535,19 @@ class GCSHook(GoogleBaseHook):
         else:
             raise ValueError("'filename' and 'data' parameter missing. One is required to upload to gcs.")
 
-    def exists(self, bucket_name: str, object_name: str) -> bool:
+    def exists(self, bucket_name: str, object_name: str, retry: Retry = DEFAULT_RETRY) -> bool:
         """
         Checks for the existence of a file in Google Cloud Storage.
 
         :param bucket_name: The Google Cloud Storage bucket where the object is.
         :param object_name: The name of the blob_name to check in the Google cloud
             storage bucket.
+        :param retry: (Optional) How to retry the RPC
         """
         client = self.get_conn()
         bucket = client.bucket(bucket_name)
         blob = bucket.blob(blob_name=object_name)
-        return blob.exists()
+        return blob.exists(retry=retry)
 
     def get_blob_update_time(self, bucket_name: str, object_name: str):
         """
diff --git a/airflow/providers/google/cloud/sensors/gcs.py b/airflow/providers/google/cloud/sensors/gcs.py
index 27c26abac0..8264a09a7b 100644
--- a/airflow/providers/google/cloud/sensors/gcs.py
+++ b/airflow/providers/google/cloud/sensors/gcs.py
@@ -23,6 +23,9 @@ import textwrap
 from datetime import datetime
 from typing import TYPE_CHECKING, Callable, Sequence
 
+from google.api_core.retry import Retry
+from google.cloud.storage.retry import DEFAULT_RETRY
+
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.sensors.base import BaseSensorOperator, poke_mode_only
@@ -51,6 +54,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding identity, with first
         account from the list granting this role to the originating account (templated).
+    :param retry: (Optional) How to retry the RPC
     """
 
     template_fields: Sequence[str] = (
@@ -68,6 +72,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
         google_cloud_conn_id: str = "google_cloud_default",
         delegate_to: str | None = None,
         impersonation_chain: str | Sequence[str] | None = None,
+        retry: Retry = DEFAULT_RETRY,
         **kwargs,
     ) -> None:
 
@@ -77,6 +82,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
         self.google_cloud_conn_id = google_cloud_conn_id
         self.delegate_to = delegate_to
         self.impersonation_chain = impersonation_chain
+        self.retry = retry
 
     def poke(self, context: Context) -> bool:
         self.log.info("Sensor checks existence of : %s, %s", self.bucket, self.object)
@@ -85,7 +91,7 @@ class GCSObjectExistenceSensor(BaseSensorOperator):
             delegate_to=self.delegate_to,
             impersonation_chain=self.impersonation_chain,
         )
-        return hook.exists(self.bucket, self.object)
+        return hook.exists(self.bucket, self.object, self.retry)
 
 
 def ts_function(context):
diff --git a/tests/providers/google/cloud/hooks/test_gcs.py b/tests/providers/google/cloud/hooks/test_gcs.py
index b4ae41d4a3..e339385617 100644
--- a/tests/providers/google/cloud/hooks/test_gcs.py
+++ b/tests/providers/google/cloud/hooks/test_gcs.py
@@ -32,6 +32,7 @@ import pytest
 
 # dynamic storage type in google.cloud needs to be type-ignored
 from google.cloud import exceptions, storage  # type: ignore[attr-defined]
+from google.cloud.storage.retry import DEFAULT_RETRY
 
 from airflow.exceptions import AirflowException
 from airflow.providers.google.cloud.hooks import gcs
@@ -156,7 +157,7 @@ class TestGCSHook(unittest.TestCase):
         assert response
         bucket_mock.assert_called_once_with(test_bucket)
         blob_object.assert_called_once_with(blob_name=test_object)
-        exists_method.assert_called_once_with()
+        exists_method.assert_called_once_with(retry=DEFAULT_RETRY)
 
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_exists_nonexisting_object(self, mock_service):
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py b/tests/providers/google/cloud/sensors/test_gcs.py
index ca4b0a80a2..bc242ce509 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -22,6 +22,7 @@ from unittest import TestCase, mock
 
 import pendulum
 import pytest
+from google.cloud.storage.retry import DEFAULT_RETRY
 
 from airflow.exceptions import AirflowSensorTimeout
 from airflow.models.dag import DAG, AirflowException
@@ -84,7 +85,7 @@ class TestGoogleCloudStorageObjectSensor(TestCase):
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
         )
-        mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET, TEST_OBJECT)
+        mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET, TEST_OBJECT, DEFAULT_RETRY)
 
 
 class TestTsFunction(TestCase):