You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2024/02/16 10:48:09 UTC
(airflow) branch main updated: Fix GCSSynchronizeBucketsOperator timeout error (#37237)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 123b656151 Fix GCSSynchronizeBucketsOperator timeout error (#37237)
123b656151 is described below
commit 123b656151be6605712951c672a703d5b7abfc72
Author: Kevin George <ke...@gmail.com>
AuthorDate: Fri Feb 16 05:48:01 2024 -0500
Fix GCSSynchronizeBucketsOperator timeout error (#37237)
Update comment to be more clear
---
airflow/providers/google/cloud/hooks/gcs.py | 20 +++++++++++++---
tests/providers/google/cloud/hooks/test_gcs.py | 32 ++++++++------------------
2 files changed, 27 insertions(+), 25 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py
index 4aaa31f428..b35f647438 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -1213,15 +1213,19 @@ class GCSHook(GoogleBaseHook):
:return: none
"""
client = self.get_conn()
+
# Create bucket object
source_bucket_obj = client.bucket(source_bucket)
destination_bucket_obj = client.bucket(destination_bucket)
+
# Normalize parameters when they are passed
source_object = normalize_directory_path(source_object)
destination_object = normalize_directory_path(destination_object)
+
# Calculate the number of characters that remove from the name, because they contain information
# about the parent's path
source_object_prefix_len = len(source_object) if source_object else 0
+
# Prepare synchronization plan
to_copy_blobs, to_delete_blobs, to_rewrite_blobs = self._prepare_sync_plan(
source_bucket=source_bucket_obj,
@@ -1246,13 +1250,14 @@ class GCSHook(GoogleBaseHook):
dst_object = self._calculate_sync_destination_path(
blob, destination_object, source_object_prefix_len
)
- self.copy(
+ self.rewrite(
source_bucket=source_bucket_obj.name,
source_object=blob.name,
destination_bucket=destination_bucket_obj.name,
destination_object=dst_object,
)
self.log.info("Blobs copied.")
+
# Delete redundant files
if not to_delete_blobs:
self.log.info("Skipped blobs deleting.")
@@ -1297,27 +1302,35 @@ class GCSHook(GoogleBaseHook):
destination_object: str | None,
recursive: bool,
) -> tuple[set[storage.Blob], set[storage.Blob], set[storage.Blob]]:
- # Calculate the number of characters that remove from the name, because they contain information
+ # Calculate the number of characters that are removed from the name, because they contain information
# about the parent's path
source_object_prefix_len = len(source_object) if source_object else 0
destination_object_prefix_len = len(destination_object) if destination_object else 0
delimiter = "/" if not recursive else None
+
# Fetch blobs list
source_blobs = list(source_bucket.list_blobs(prefix=source_object, delimiter=delimiter))
destination_blobs = list(
destination_bucket.list_blobs(prefix=destination_object, delimiter=delimiter)
)
+
# Create indexes that allow you to identify blobs based on their name
source_names_index = {a.name[source_object_prefix_len:]: a for a in source_blobs}
destination_names_index = {a.name[destination_object_prefix_len:]: a for a in destination_blobs}
+
# Create sets with names without parent object name
source_names = set(source_names_index.keys())
+ # Discards empty string from source set that creates an empty subdirectory in
+ # destination bucket with source subdirectory name
+ source_names.discard("")
destination_names = set(destination_names_index.keys())
+
# Determine objects to copy and delete
to_copy = source_names - destination_names
to_delete = destination_names - source_names
to_copy_blobs: set[storage.Blob] = {source_names_index[a] for a in to_copy}
to_delete_blobs: set[storage.Blob] = {destination_names_index[a] for a in to_delete}
+
# Find names that are in both buckets
names_to_check = source_names.intersection(destination_names)
to_rewrite_blobs: set[storage.Blob] = set()
@@ -1325,9 +1338,10 @@ class GCSHook(GoogleBaseHook):
for current_name in names_to_check:
source_blob = source_names_index[current_name]
destination_blob = destination_names_index[current_name]
- # if the objects are different, save it
+ # If the objects are different, save it
if source_blob.crc32c != destination_blob.crc32c:
to_rewrite_blobs.add(source_blob)
+
return to_copy_blobs, to_delete_blobs, to_rewrite_blobs
diff --git a/tests/providers/google/cloud/hooks/test_gcs.py b/tests/providers/google/cloud/hooks/test_gcs.py
index 825a357d39..1a0ce20030 100644
--- a/tests/providers/google/cloud/hooks/test_gcs.py
+++ b/tests/providers/google/cloud/hooks/test_gcs.py
@@ -1081,7 +1081,6 @@ class TestSyncGcsHook:
def test_should_do_nothing_when_buckets_is_empty(
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = []
destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1104,7 +1103,6 @@ class TestSyncGcsHook:
def test_should_append_slash_to_object_if_missing(
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = []
destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1124,7 +1122,6 @@ class TestSyncGcsHook:
@mock.patch(GCS_STRING.format("GCSHook.delete"))
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_should_copy_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = [
self._create_blob("FILE_A", "C1"),
@@ -1135,31 +1132,30 @@ class TestSyncGcsHook:
mock_get_conn.return_value.bucket.side_effect = [source_bucket, destination_bucket]
self.gcs_hook.sync(source_bucket="SOURCE_BUCKET", destination_bucket="DEST_BUCKET")
mock_delete.assert_not_called()
- mock_rewrite.assert_not_called()
- mock_copy.assert_has_calls(
+ mock_rewrite.assert_has_calls(
[
mock.call(
- destination_bucket="DEST_BUCKET",
- destination_object="FILE_A",
source_bucket="SOURCE_BUCKET",
source_object="FILE_A",
+ destination_bucket="DEST_BUCKET",
+ destination_object="FILE_A",
),
mock.call(
- destination_bucket="DEST_BUCKET",
- destination_object="FILE_B",
source_bucket="SOURCE_BUCKET",
source_object="FILE_B",
+ destination_bucket="DEST_BUCKET",
+ destination_object="FILE_B",
),
],
any_order=True,
)
+ mock_copy.assert_not_called()
@mock.patch(GCS_STRING.format("GCSHook.copy"))
@mock.patch(GCS_STRING.format("GCSHook.rewrite"))
@mock.patch(GCS_STRING.format("GCSHook.delete"))
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_should_copy_files_non_recursive(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = [
self._create_blob("FILE_A", "C1"),
@@ -1177,7 +1173,6 @@ class TestSyncGcsHook:
@mock.patch(GCS_STRING.format("GCSHook.delete"))
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_should_copy_files_to_subdirectory(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = [
self._create_blob("FILE_A", "C1"),
@@ -1190,8 +1185,7 @@ class TestSyncGcsHook:
source_bucket="SOURCE_BUCKET", destination_bucket="DEST_BUCKET", destination_object="DEST_OBJ/"
)
mock_delete.assert_not_called()
- mock_rewrite.assert_not_called()
- mock_copy.assert_has_calls(
+ mock_rewrite.assert_has_calls(
[
mock.call(
source_bucket="SOURCE_BUCKET",
@@ -1208,13 +1202,13 @@ class TestSyncGcsHook:
],
any_order=True,
)
+ mock_copy.assert_not_called()
@mock.patch(GCS_STRING.format("GCSHook.copy"))
@mock.patch(GCS_STRING.format("GCSHook.rewrite"))
@mock.patch(GCS_STRING.format("GCSHook.delete"))
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_should_copy_files_from_subdirectory(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = [
self._create_blob("SRC_OBJ/FILE_A", "C1"),
@@ -1227,8 +1221,7 @@ class TestSyncGcsHook:
source_bucket="SOURCE_BUCKET", destination_bucket="DEST_BUCKET", source_object="SRC_OBJ/"
)
mock_delete.assert_not_called()
- mock_rewrite.assert_not_called()
- mock_copy.assert_has_calls(
+ mock_rewrite.assert_has_calls(
[
mock.call(
source_bucket="SOURCE_BUCKET",
@@ -1245,13 +1238,13 @@ class TestSyncGcsHook:
],
any_order=True,
)
+ mock_copy.assert_not_called()
@mock.patch(GCS_STRING.format("GCSHook.copy"))
@mock.patch(GCS_STRING.format("GCSHook.rewrite"))
@mock.patch(GCS_STRING.format("GCSHook.delete"))
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_should_overwrite_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = [
self._create_blob("FILE_A", "C1"),
@@ -1293,7 +1286,6 @@ class TestSyncGcsHook:
def test_should_overwrite_files_to_subdirectory(
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = [
self._create_blob("FILE_A", "C1"),
@@ -1338,7 +1330,6 @@ class TestSyncGcsHook:
def test_should_overwrite_files_from_subdirectory(
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = [
self._create_blob("SRC_OBJ/FILE_A", "C1"),
@@ -1381,7 +1372,6 @@ class TestSyncGcsHook:
@mock.patch(GCS_STRING.format("GCSHook.delete"))
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
def test_should_delete_extra_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = []
destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1407,7 +1397,6 @@ class TestSyncGcsHook:
def test_should_not_delete_extra_files_when_delete_extra_files_is_disabled(
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = []
destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1430,7 +1419,6 @@ class TestSyncGcsHook:
def test_should_not_overwrite_when_overwrite_is_disabled(
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
):
- # mock_get_conn.return_value =
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
source_bucket.list_blobs.return_value = [
self._create_blob("SRC_OBJ/FILE_A", "C1", source_bucket),