You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/12/24 13:12:14 UTC

[airflow] branch master updated: Add timeout option to gcs hook methods. (#13156)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 323084e  Add timeout option to gcs hook methods. (#13156)
323084e is described below

commit 323084e97ddacbc5512709bf0cad8f53082d16b0
Author: Joshua Carp <jm...@gmail.com>
AuthorDate: Thu Dec 24 08:12:06 2020 -0500

    Add timeout option to gcs hook methods. (#13156)
---
 airflow/providers/google/cloud/hooks/gcs.py    | 30 ++++++++++++++++++++------
 setup.py                                       |  2 +-
 tests/providers/google/cloud/hooks/test_gcs.py | 14 ++++++------
 3 files changed, 32 insertions(+), 14 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py
index 0ca3961..72a23ea 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -40,6 +40,9 @@ from airflow.version import version
 RT = TypeVar('RT')  # pylint: disable=invalid-name
 T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
 
+# Use default timeout from google-cloud-storage
+DEFAULT_TIMEOUT = 60
+
 
 def _fallback_object_url_to_object_name_and_bucket_name(
     object_url_keyword_arg_name='object_url',
@@ -257,7 +260,12 @@ class GCSHook(GoogleBaseHook):
         )
 
     def download(
-        self, object_name: str, bucket_name: Optional[str], filename: Optional[str] = None
+        self,
+        object_name: str,
+        bucket_name: Optional[str],
+        filename: Optional[str] = None,
+        chunk_size: Optional[int] = None,
+        timeout: Optional[int] = DEFAULT_TIMEOUT,
     ) -> Union[str, bytes]:
         """
         Downloads a file from Google Cloud Storage.
@@ -273,16 +281,20 @@ class GCSHook(GoogleBaseHook):
         :type object_name: str
         :param filename: If set, a local file path where the file should be written to.
         :type filename: str
+        :param chunk_size: Blob chunk size.
+        :type chunk_size: int
+        :param timeout: Request timeout in seconds.
+        :type timeout: int
         """
         # TODO: future improvement check file size before downloading,
         #  to check for local space availability
 
         client = self.get_conn()
         bucket = client.bucket(bucket_name)
-        blob = bucket.blob(blob_name=object_name)
+        blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)
 
         if filename:
-            blob.download_to_filename(filename)
+            blob.download_to_filename(filename, timeout=timeout)
             self.log.info('File downloaded to %s', filename)
             return filename
         else:
@@ -359,6 +371,8 @@ class GCSHook(GoogleBaseHook):
         mime_type: Optional[str] = None,
         gzip: bool = False,
         encoding: str = 'utf-8',
+        chunk_size: Optional[int] = None,
+        timeout: Optional[int] = DEFAULT_TIMEOUT,
     ) -> None:
         """
         Uploads a local file or file data as string or bytes to Google Cloud Storage.
@@ -377,10 +391,14 @@ class GCSHook(GoogleBaseHook):
         :type gzip: bool
         :param encoding: bytes encoding for file data if provided as string
         :type encoding: str
+        :param chunk_size: Blob chunk size.
+        :type chunk_size: int
+        :param timeout: Request timeout in seconds.
+        :type timeout: int
         """
         client = self.get_conn()
         bucket = client.bucket(bucket_name)
-        blob = bucket.blob(blob_name=object_name)
+        blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)
         if filename and data:
             raise ValueError(
                 "'filename' and 'data' parameter provided. Please "
@@ -398,7 +416,7 @@ class GCSHook(GoogleBaseHook):
                         shutil.copyfileobj(f_in, f_out)
                         filename = filename_gz
 
-            blob.upload_from_filename(filename=filename, content_type=mime_type)
+            blob.upload_from_filename(filename=filename, content_type=mime_type, timeout=timeout)
             if gzip:
                 os.remove(filename)
             self.log.info('File %s uploaded to %s in %s bucket', filename, object_name, bucket_name)
@@ -412,7 +430,7 @@ class GCSHook(GoogleBaseHook):
                 with gz.GzipFile(fileobj=out, mode="w") as f:
                     f.write(data)
                 data = out.getvalue()
-            blob.upload_from_string(data, content_type=mime_type)
+            blob.upload_from_string(data, content_type=mime_type, timeout=timeout)
             self.log.info('Data stream uploaded to %s in %s bucket', object_name, bucket_name)
         else:
             raise ValueError("'filename' and 'data' parameter missing. One is required to upload to gcs.")
diff --git a/setup.py b/setup.py
index 33f2472..de3daf9 100644
--- a/setup.py
+++ b/setup.py
@@ -271,7 +271,7 @@ google = [
     'google-cloud-secret-manager>=0.2.0,<2.0.0',
     'google-cloud-spanner>=1.10.0,<2.0.0',
     'google-cloud-speech>=0.36.3,<2.0.0',
-    'google-cloud-storage>=1.16,<2.0.0',
+    'google-cloud-storage>=1.30,<2.0.0',
     'google-cloud-tasks>=1.2.1,<2.0.0',
     'google-cloud-texttospeech>=0.4.0,<2.0.0',
     'google-cloud-translate>=1.5.0,<2.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_gcs.py b/tests/providers/google/cloud/hooks/test_gcs.py
index dffe5ad..1ce44bb 100644
--- a/tests/providers/google/cloud/hooks/test_gcs.py
+++ b/tests/providers/google/cloud/hooks/test_gcs.py
@@ -672,7 +672,7 @@ class TestGCSHook(unittest.TestCase):
         )
 
         self.assertEqual(response, test_file)
-        download_filename_method.assert_called_once_with(test_file)
+        download_filename_method.assert_called_once_with(test_file, timeout=60)
 
     @mock.patch(GCS_STRING.format('NamedTemporaryFile'))
     @mock.patch(GCS_STRING.format('GCSHook.get_conn'))
@@ -697,7 +697,7 @@ class TestGCSHook(unittest.TestCase):
         with self.gcs_hook.provide_file(bucket_name=test_bucket, object_name=test_object) as response:
 
             self.assertEqual(test_file, response.name)
-        download_filename_method.assert_called_once_with(test_file)
+        download_filename_method.assert_called_once_with(test_file, timeout=60)
         mock_temp_file.assert_has_calls(
             [
                 mock.call(suffix='test_object'),
@@ -762,7 +762,7 @@ class TestGCSHookUpload(unittest.TestCase):
         self.gcs_hook.upload(test_bucket, test_object, filename=self.testfile.name)
 
         upload_method.assert_called_once_with(
-            filename=self.testfile.name, content_type='application/octet-stream'
+            filename=self.testfile.name, content_type='application/octet-stream', timeout=60
         )
 
     @mock.patch(GCS_STRING.format('GCSHook.get_conn'))
@@ -782,7 +782,7 @@ class TestGCSHookUpload(unittest.TestCase):
 
         self.gcs_hook.upload(test_bucket, test_object, data=self.testdata_str)
 
-        upload_method.assert_called_once_with(self.testdata_str, content_type='text/plain')
+        upload_method.assert_called_once_with(self.testdata_str, content_type='text/plain', timeout=60)
 
     @mock.patch(GCS_STRING.format('GCSHook.get_conn'))
     def test_upload_data_bytes(self, mock_service):
@@ -793,7 +793,7 @@ class TestGCSHookUpload(unittest.TestCase):
 
         self.gcs_hook.upload(test_bucket, test_object, data=self.testdata_bytes)
 
-        upload_method.assert_called_once_with(self.testdata_bytes, content_type='text/plain')
+        upload_method.assert_called_once_with(self.testdata_bytes, content_type='text/plain', timeout=60)
 
     @mock.patch(GCS_STRING.format('BytesIO'))
     @mock.patch(GCS_STRING.format('gz.GzipFile'))
@@ -812,7 +812,7 @@ class TestGCSHookUpload(unittest.TestCase):
         byte_str = bytes(self.testdata_str, encoding)
         mock_gzip.assert_called_once_with(fileobj=mock_bytes_io.return_value, mode="w")
         gzip_ctx.write.assert_called_once_with(byte_str)
-        upload_method.assert_called_once_with(data, content_type='text/plain')
+        upload_method.assert_called_once_with(data, content_type='text/plain', timeout=60)
 
     @mock.patch(GCS_STRING.format('BytesIO'))
     @mock.patch(GCS_STRING.format('gz.GzipFile'))
@@ -829,7 +829,7 @@ class TestGCSHookUpload(unittest.TestCase):
 
         mock_gzip.assert_called_once_with(fileobj=mock_bytes_io.return_value, mode="w")
         gzip_ctx.write.assert_called_once_with(self.testdata_bytes)
-        upload_method.assert_called_once_with(data, content_type='text/plain')
+        upload_method.assert_called_once_with(data, content_type='text/plain', timeout=60)
 
     @mock.patch(GCS_STRING.format('GCSHook.get_conn'))
     def test_upload_exceptions(self, mock_service):