You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2024/02/16 10:48:09 UTC

(airflow) branch main updated: Fix GCSSynchronizeBucketsOperator timeout error (#37237)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 123b656151 Fix GCSSynchronizeBucketsOperator timeout error (#37237)
123b656151 is described below

commit 123b656151be6605712951c672a703d5b7abfc72
Author: Kevin George <ke...@gmail.com>
AuthorDate: Fri Feb 16 05:48:01 2024 -0500

    Fix GCSSynchronizeBucketsOperator timeout error (#37237)
    
    Update comment to be more clear
---
 airflow/providers/google/cloud/hooks/gcs.py    | 20 +++++++++++++---
 tests/providers/google/cloud/hooks/test_gcs.py | 32 ++++++++------------------
 2 files changed, 27 insertions(+), 25 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py
index 4aaa31f428..b35f647438 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -1213,15 +1213,19 @@ class GCSHook(GoogleBaseHook):
         :return: none
         """
         client = self.get_conn()
+
         # Create bucket object
         source_bucket_obj = client.bucket(source_bucket)
         destination_bucket_obj = client.bucket(destination_bucket)
+
         # Normalize parameters when they are passed
         source_object = normalize_directory_path(source_object)
         destination_object = normalize_directory_path(destination_object)
+
         # Calculate the number of characters that remove from the name, because they contain information
         # about the parent's path
         source_object_prefix_len = len(source_object) if source_object else 0
+
         # Prepare synchronization plan
         to_copy_blobs, to_delete_blobs, to_rewrite_blobs = self._prepare_sync_plan(
             source_bucket=source_bucket_obj,
@@ -1246,13 +1250,14 @@ class GCSHook(GoogleBaseHook):
                 dst_object = self._calculate_sync_destination_path(
                     blob, destination_object, source_object_prefix_len
                 )
-                self.copy(
+                self.rewrite(
                     source_bucket=source_bucket_obj.name,
                     source_object=blob.name,
                     destination_bucket=destination_bucket_obj.name,
                     destination_object=dst_object,
                 )
             self.log.info("Blobs copied.")
+
         # Delete redundant files
         if not to_delete_blobs:
             self.log.info("Skipped blobs deleting.")
@@ -1297,27 +1302,35 @@ class GCSHook(GoogleBaseHook):
         destination_object: str | None,
         recursive: bool,
     ) -> tuple[set[storage.Blob], set[storage.Blob], set[storage.Blob]]:
-        # Calculate the number of characters that remove from the name, because they contain information
+        # Calculate the number of characters that are removed from the name, because they contain information
         # about the parent's path
         source_object_prefix_len = len(source_object) if source_object else 0
         destination_object_prefix_len = len(destination_object) if destination_object else 0
         delimiter = "/" if not recursive else None
+
         # Fetch blobs list
         source_blobs = list(source_bucket.list_blobs(prefix=source_object, delimiter=delimiter))
         destination_blobs = list(
             destination_bucket.list_blobs(prefix=destination_object, delimiter=delimiter)
         )
+
         # Create indexes that allow you to identify blobs based on their name
         source_names_index = {a.name[source_object_prefix_len:]: a for a in source_blobs}
         destination_names_index = {a.name[destination_object_prefix_len:]: a for a in destination_blobs}
+
         # Create sets with names without parent object name
         source_names = set(source_names_index.keys())
+        # Discards empty string from source set that creates an empty subdirectory in
+        # destination bucket with source subdirectory name
+        source_names.discard("")
         destination_names = set(destination_names_index.keys())
+
         # Determine objects to copy and delete
         to_copy = source_names - destination_names
         to_delete = destination_names - source_names
         to_copy_blobs: set[storage.Blob] = {source_names_index[a] for a in to_copy}
         to_delete_blobs: set[storage.Blob] = {destination_names_index[a] for a in to_delete}
+
         # Find names that are in both buckets
         names_to_check = source_names.intersection(destination_names)
         to_rewrite_blobs: set[storage.Blob] = set()
@@ -1325,9 +1338,10 @@ class GCSHook(GoogleBaseHook):
         for current_name in names_to_check:
             source_blob = source_names_index[current_name]
             destination_blob = destination_names_index[current_name]
-            # if the objects are different, save it
+            # If the objects are different, save it
             if source_blob.crc32c != destination_blob.crc32c:
                 to_rewrite_blobs.add(source_blob)
+
         return to_copy_blobs, to_delete_blobs, to_rewrite_blobs
 
 
diff --git a/tests/providers/google/cloud/hooks/test_gcs.py b/tests/providers/google/cloud/hooks/test_gcs.py
index 825a357d39..1a0ce20030 100644
--- a/tests/providers/google/cloud/hooks/test_gcs.py
+++ b/tests/providers/google/cloud/hooks/test_gcs.py
@@ -1081,7 +1081,6 @@ class TestSyncGcsHook:
     def test_should_do_nothing_when_buckets_is_empty(
         self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
     ):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = []
         destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1104,7 +1103,6 @@ class TestSyncGcsHook:
     def test_should_append_slash_to_object_if_missing(
         self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
     ):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = []
         destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1124,7 +1122,6 @@ class TestSyncGcsHook:
     @mock.patch(GCS_STRING.format("GCSHook.delete"))
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_should_copy_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = [
             self._create_blob("FILE_A", "C1"),
@@ -1135,31 +1132,30 @@ class TestSyncGcsHook:
         mock_get_conn.return_value.bucket.side_effect = [source_bucket, destination_bucket]
         self.gcs_hook.sync(source_bucket="SOURCE_BUCKET", destination_bucket="DEST_BUCKET")
         mock_delete.assert_not_called()
-        mock_rewrite.assert_not_called()
-        mock_copy.assert_has_calls(
+        mock_rewrite.assert_has_calls(
             [
                 mock.call(
-                    destination_bucket="DEST_BUCKET",
-                    destination_object="FILE_A",
                     source_bucket="SOURCE_BUCKET",
                     source_object="FILE_A",
+                    destination_bucket="DEST_BUCKET",
+                    destination_object="FILE_A",
                 ),
                 mock.call(
-                    destination_bucket="DEST_BUCKET",
-                    destination_object="FILE_B",
                     source_bucket="SOURCE_BUCKET",
                     source_object="FILE_B",
+                    destination_bucket="DEST_BUCKET",
+                    destination_object="FILE_B",
                 ),
             ],
             any_order=True,
         )
+        mock_copy.assert_not_called()
 
     @mock.patch(GCS_STRING.format("GCSHook.copy"))
     @mock.patch(GCS_STRING.format("GCSHook.rewrite"))
     @mock.patch(GCS_STRING.format("GCSHook.delete"))
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_should_copy_files_non_recursive(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = [
             self._create_blob("FILE_A", "C1"),
@@ -1177,7 +1173,6 @@ class TestSyncGcsHook:
     @mock.patch(GCS_STRING.format("GCSHook.delete"))
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_should_copy_files_to_subdirectory(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = [
             self._create_blob("FILE_A", "C1"),
@@ -1190,8 +1185,7 @@ class TestSyncGcsHook:
             source_bucket="SOURCE_BUCKET", destination_bucket="DEST_BUCKET", destination_object="DEST_OBJ/"
         )
         mock_delete.assert_not_called()
-        mock_rewrite.assert_not_called()
-        mock_copy.assert_has_calls(
+        mock_rewrite.assert_has_calls(
             [
                 mock.call(
                     source_bucket="SOURCE_BUCKET",
@@ -1208,13 +1202,13 @@ class TestSyncGcsHook:
             ],
             any_order=True,
         )
+        mock_copy.assert_not_called()
 
     @mock.patch(GCS_STRING.format("GCSHook.copy"))
     @mock.patch(GCS_STRING.format("GCSHook.rewrite"))
     @mock.patch(GCS_STRING.format("GCSHook.delete"))
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_should_copy_files_from_subdirectory(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = [
             self._create_blob("SRC_OBJ/FILE_A", "C1"),
@@ -1227,8 +1221,7 @@ class TestSyncGcsHook:
             source_bucket="SOURCE_BUCKET", destination_bucket="DEST_BUCKET", source_object="SRC_OBJ/"
         )
         mock_delete.assert_not_called()
-        mock_rewrite.assert_not_called()
-        mock_copy.assert_has_calls(
+        mock_rewrite.assert_has_calls(
             [
                 mock.call(
                     source_bucket="SOURCE_BUCKET",
@@ -1245,13 +1238,13 @@ class TestSyncGcsHook:
             ],
             any_order=True,
         )
+        mock_copy.assert_not_called()
 
     @mock.patch(GCS_STRING.format("GCSHook.copy"))
     @mock.patch(GCS_STRING.format("GCSHook.rewrite"))
     @mock.patch(GCS_STRING.format("GCSHook.delete"))
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_should_overwrite_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = [
             self._create_blob("FILE_A", "C1"),
@@ -1293,7 +1286,6 @@ class TestSyncGcsHook:
     def test_should_overwrite_files_to_subdirectory(
         self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
     ):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = [
             self._create_blob("FILE_A", "C1"),
@@ -1338,7 +1330,6 @@ class TestSyncGcsHook:
     def test_should_overwrite_files_from_subdirectory(
         self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
     ):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = [
             self._create_blob("SRC_OBJ/FILE_A", "C1"),
@@ -1381,7 +1372,6 @@ class TestSyncGcsHook:
     @mock.patch(GCS_STRING.format("GCSHook.delete"))
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_should_delete_extra_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = []
         destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1407,7 +1397,6 @@ class TestSyncGcsHook:
     def test_should_not_delete_extra_files_when_delete_extra_files_is_disabled(
         self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
     ):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = []
         destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1430,7 +1419,6 @@ class TestSyncGcsHook:
     def test_should_not_overwrite_when_overwrite_is_disabled(
         self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
     ):
-        # mock_get_conn.return_value =
         source_bucket = self._create_bucket(name="SOURCE_BUCKET")
         source_bucket.list_blobs.return_value = [
             self._create_blob("SRC_OBJ/FILE_A", "C1", source_bucket),