You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/12/24 13:12:14 UTC
[airflow] branch master updated: Add timeout option to gcs hook
methods. (#13156)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 323084e Add timeout option to gcs hook methods. (#13156)
323084e is described below
commit 323084e97ddacbc5512709bf0cad8f53082d16b0
Author: Joshua Carp <jm...@gmail.com>
AuthorDate: Thu Dec 24 08:12:06 2020 -0500
Add timeout option to gcs hook methods. (#13156)
---
airflow/providers/google/cloud/hooks/gcs.py | 30 ++++++++++++++++++++------
setup.py | 2 +-
tests/providers/google/cloud/hooks/test_gcs.py | 14 ++++++------
3 files changed, 32 insertions(+), 14 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py
index 0ca3961..72a23ea 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -40,6 +40,9 @@ from airflow.version import version
RT = TypeVar('RT') # pylint: disable=invalid-name
T = TypeVar("T", bound=Callable) # pylint: disable=invalid-name
+# Use default timeout from google-cloud-storage
+DEFAULT_TIMEOUT = 60
+
def _fallback_object_url_to_object_name_and_bucket_name(
object_url_keyword_arg_name='object_url',
@@ -257,7 +260,12 @@ class GCSHook(GoogleBaseHook):
)
def download(
- self, object_name: str, bucket_name: Optional[str], filename: Optional[str] = None
+ self,
+ object_name: str,
+ bucket_name: Optional[str],
+ filename: Optional[str] = None,
+ chunk_size: Optional[int] = None,
+ timeout: Optional[int] = DEFAULT_TIMEOUT,
) -> Union[str, bytes]:
"""
Downloads a file from Google Cloud Storage.
@@ -273,16 +281,20 @@ class GCSHook(GoogleBaseHook):
:type object_name: str
:param filename: If set, a local file path where the file should be written to.
:type filename: str
+ :param chunk_size: Blob chunk size.
+ :type chunk_size: int
+ :param timeout: Request timeout in seconds.
+ :type timeout: int
"""
# TODO: future improvement check file size before downloading,
# to check for local space availability
client = self.get_conn()
bucket = client.bucket(bucket_name)
- blob = bucket.blob(blob_name=object_name)
+ blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)
if filename:
- blob.download_to_filename(filename)
+ blob.download_to_filename(filename, timeout=timeout)
self.log.info('File downloaded to %s', filename)
return filename
else:
@@ -359,6 +371,8 @@ class GCSHook(GoogleBaseHook):
mime_type: Optional[str] = None,
gzip: bool = False,
encoding: str = 'utf-8',
+ chunk_size: Optional[int] = None,
+ timeout: Optional[int] = DEFAULT_TIMEOUT,
) -> None:
"""
Uploads a local file or file data as string or bytes to Google Cloud Storage.
@@ -377,10 +391,14 @@ class GCSHook(GoogleBaseHook):
:type gzip: bool
:param encoding: bytes encoding for file data if provided as string
:type encoding: str
+ :param chunk_size: Blob chunk size.
+ :type chunk_size: int
+ :param timeout: Request timeout in seconds.
+ :type timeout: int
"""
client = self.get_conn()
bucket = client.bucket(bucket_name)
- blob = bucket.blob(blob_name=object_name)
+ blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size)
if filename and data:
raise ValueError(
"'filename' and 'data' parameter provided. Please "
@@ -398,7 +416,7 @@ class GCSHook(GoogleBaseHook):
shutil.copyfileobj(f_in, f_out)
filename = filename_gz
- blob.upload_from_filename(filename=filename, content_type=mime_type)
+ blob.upload_from_filename(filename=filename, content_type=mime_type, timeout=timeout)
if gzip:
os.remove(filename)
self.log.info('File %s uploaded to %s in %s bucket', filename, object_name, bucket_name)
@@ -412,7 +430,7 @@ class GCSHook(GoogleBaseHook):
with gz.GzipFile(fileobj=out, mode="w") as f:
f.write(data)
data = out.getvalue()
- blob.upload_from_string(data, content_type=mime_type)
+ blob.upload_from_string(data, content_type=mime_type, timeout=timeout)
self.log.info('Data stream uploaded to %s in %s bucket', object_name, bucket_name)
else:
raise ValueError("'filename' and 'data' parameter missing. One is required to upload to gcs.")
diff --git a/setup.py b/setup.py
index 33f2472..de3daf9 100644
--- a/setup.py
+++ b/setup.py
@@ -271,7 +271,7 @@ google = [
'google-cloud-secret-manager>=0.2.0,<2.0.0',
'google-cloud-spanner>=1.10.0,<2.0.0',
'google-cloud-speech>=0.36.3,<2.0.0',
- 'google-cloud-storage>=1.16,<2.0.0',
+ 'google-cloud-storage>=1.30,<2.0.0',
'google-cloud-tasks>=1.2.1,<2.0.0',
'google-cloud-texttospeech>=0.4.0,<2.0.0',
'google-cloud-translate>=1.5.0,<2.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_gcs.py b/tests/providers/google/cloud/hooks/test_gcs.py
index dffe5ad..1ce44bb 100644
--- a/tests/providers/google/cloud/hooks/test_gcs.py
+++ b/tests/providers/google/cloud/hooks/test_gcs.py
@@ -672,7 +672,7 @@ class TestGCSHook(unittest.TestCase):
)
self.assertEqual(response, test_file)
- download_filename_method.assert_called_once_with(test_file)
+ download_filename_method.assert_called_once_with(test_file, timeout=60)
@mock.patch(GCS_STRING.format('NamedTemporaryFile'))
@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
@@ -697,7 +697,7 @@ class TestGCSHook(unittest.TestCase):
with self.gcs_hook.provide_file(bucket_name=test_bucket, object_name=test_object) as response:
self.assertEqual(test_file, response.name)
- download_filename_method.assert_called_once_with(test_file)
+ download_filename_method.assert_called_once_with(test_file, timeout=60)
mock_temp_file.assert_has_calls(
[
mock.call(suffix='test_object'),
@@ -762,7 +762,7 @@ class TestGCSHookUpload(unittest.TestCase):
self.gcs_hook.upload(test_bucket, test_object, filename=self.testfile.name)
upload_method.assert_called_once_with(
- filename=self.testfile.name, content_type='application/octet-stream'
+ filename=self.testfile.name, content_type='application/octet-stream', timeout=60
)
@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
@@ -782,7 +782,7 @@ class TestGCSHookUpload(unittest.TestCase):
self.gcs_hook.upload(test_bucket, test_object, data=self.testdata_str)
- upload_method.assert_called_once_with(self.testdata_str, content_type='text/plain')
+ upload_method.assert_called_once_with(self.testdata_str, content_type='text/plain', timeout=60)
@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
def test_upload_data_bytes(self, mock_service):
@@ -793,7 +793,7 @@ class TestGCSHookUpload(unittest.TestCase):
self.gcs_hook.upload(test_bucket, test_object, data=self.testdata_bytes)
- upload_method.assert_called_once_with(self.testdata_bytes, content_type='text/plain')
+ upload_method.assert_called_once_with(self.testdata_bytes, content_type='text/plain', timeout=60)
@mock.patch(GCS_STRING.format('BytesIO'))
@mock.patch(GCS_STRING.format('gz.GzipFile'))
@@ -812,7 +812,7 @@ class TestGCSHookUpload(unittest.TestCase):
byte_str = bytes(self.testdata_str, encoding)
mock_gzip.assert_called_once_with(fileobj=mock_bytes_io.return_value, mode="w")
gzip_ctx.write.assert_called_once_with(byte_str)
- upload_method.assert_called_once_with(data, content_type='text/plain')
+ upload_method.assert_called_once_with(data, content_type='text/plain', timeout=60)
@mock.patch(GCS_STRING.format('BytesIO'))
@mock.patch(GCS_STRING.format('gz.GzipFile'))
@@ -829,7 +829,7 @@ class TestGCSHookUpload(unittest.TestCase):
mock_gzip.assert_called_once_with(fileobj=mock_bytes_io.return_value, mode="w")
gzip_ctx.write.assert_called_once_with(self.testdata_bytes)
- upload_method.assert_called_once_with(data, content_type='text/plain')
+ upload_method.assert_called_once_with(data, content_type='text/plain', timeout=60)
@mock.patch(GCS_STRING.format('GCSHook.get_conn'))
def test_upload_exceptions(self, mock_service):