You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/06 12:17:32 UTC

[airflow] branch main updated: Fix GCSToGCSOperator cannot copy a single file/folder without copying other files/folders with that prefix (#24039)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ec84ffe71c Fix GCSToGCSOperator cannot copy a single file/folder without copying other files/folders with that prefix (#24039)
ec84ffe71c is described below

commit ec84ffe71cfa8246155b9b4cb10bf2167e75adcf
Author: GitStart-AirFlow <10...@users.noreply.github.com>
AuthorDate: Mon Jun 6 13:17:18 2022 +0100

    Fix GCSToGCSOperator cannot copy a single file/folder without copying other files/folders with that prefix (#24039)
---
 .../providers/google/cloud/transfers/gcs_to_gcs.py |  6 ++++++
 .../google/cloud/transfers/test_gcs_to_gcs.py      | 22 ++++++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index 5a10aa7a32..fa4f523ea4 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -89,6 +89,8 @@ class GCSToGCSOperator(BaseOperator):
         account from the list granting this role to the originating account (templated).
     :param source_object_required: Whether you want to raise an exception when the source object
         doesn't exist. It doesn't have any effect when the source objects are folders or patterns.
+    :param exact_match: When specified, only exact match of the source object (filename) will be
+        copied.
 
     :Example:
 
@@ -189,6 +191,7 @@ class GCSToGCSOperator(BaseOperator):
         is_older_than=None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
         source_object_required=False,
+        exact_match=False,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -208,6 +211,7 @@ class GCSToGCSOperator(BaseOperator):
         self.is_older_than = is_older_than
         self.impersonation_chain = impersonation_chain
         self.source_object_required = source_object_required
+        self.exact_match = exact_match
 
     def execute(self, context: 'Context'):
 
@@ -341,6 +345,8 @@ class GCSToGCSOperator(BaseOperator):
                 raise AirflowException(msg)
 
         for source_obj in objects:
+            if self.exact_match and (source_obj != prefix or not source_obj.endswith(prefix)):
+                continue
             if self.destination_object is None:
                 destination_object = source_obj
             else:
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index 7d5af935ea..61186591f3 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -121,6 +121,28 @@ class TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase):
         ]
         mock_hook.return_value.list.assert_has_calls(mock_calls)
 
+    @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
+    def test_copy_file_with_exact_match(self, mock_hook):
+        SOURCE_FILES = [
+            'test_object.txt',
+            'test_object.txt.copy/',
+            'test_object.txt.folder/',
+        ]
+        mock_hook.return_value.list.return_value = SOURCE_FILES
+        operator = GCSToGCSOperator(
+            task_id=TASK_ID,
+            source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_NO_WILDCARD,
+            destination_bucket=DESTINATION_BUCKET,
+            exact_match=True,
+        )
+
+        operator.execute(None)
+        mock_calls = [
+            mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None),
+        ]
+        mock_hook.return_value.list.assert_has_calls(mock_calls)
+
     @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
     def test_execute_prefix_and_suffix(self, mock_hook):
         operator = GCSToGCSOperator(