You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/06 12:17:32 UTC
[airflow] branch main updated: Fix GCSToGCSOperator cannot copy a single file/folder without copying other files/folders with that prefix (#24039)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ec84ffe71c Fix GCSToGCSOperator cannot copy a single file/folder without copying other files/folders with that prefix (#24039)
ec84ffe71c is described below
commit ec84ffe71cfa8246155b9b4cb10bf2167e75adcf
Author: GitStart-AirFlow <10...@users.noreply.github.com>
AuthorDate: Mon Jun 6 13:17:18 2022 +0100
Fix GCSToGCSOperator cannot copy a single file/folder without copying other files/folders with that prefix (#24039)
---
.../providers/google/cloud/transfers/gcs_to_gcs.py | 6 ++++++
.../google/cloud/transfers/test_gcs_to_gcs.py | 22 ++++++++++++++++++++++
2 files changed, 28 insertions(+)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index 5a10aa7a32..fa4f523ea4 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -89,6 +89,8 @@ class GCSToGCSOperator(BaseOperator):
account from the list granting this role to the originating account (templated).
:param source_object_required: Whether you want to raise an exception when the source object
doesn't exist. It doesn't have any effect when the source objects are folders or patterns.
+ :param exact_match: When specified, only exact match of the source object (filename) will be
+ copied.
:Example:
@@ -189,6 +191,7 @@ class GCSToGCSOperator(BaseOperator):
is_older_than=None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
source_object_required=False,
+ exact_match=False,
**kwargs,
):
super().__init__(**kwargs)
@@ -208,6 +211,7 @@ class GCSToGCSOperator(BaseOperator):
self.is_older_than = is_older_than
self.impersonation_chain = impersonation_chain
self.source_object_required = source_object_required
+ self.exact_match = exact_match
def execute(self, context: 'Context'):
@@ -341,6 +345,8 @@ class GCSToGCSOperator(BaseOperator):
raise AirflowException(msg)
for source_obj in objects:
+ if self.exact_match and (source_obj != prefix or not source_obj.endswith(prefix)):
+ continue
if self.destination_object is None:
destination_object = source_obj
else:
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index 7d5af935ea..61186591f3 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -121,6 +121,28 @@ class TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase):
]
mock_hook.return_value.list.assert_has_calls(mock_calls)
+ @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
+ def test_copy_file_with_exact_match(self, mock_hook):
+ SOURCE_FILES = [
+ 'test_object.txt',
+ 'test_object.txt.copy/',
+ 'test_object.txt.folder/',
+ ]
+ mock_hook.return_value.list.return_value = SOURCE_FILES
+ operator = GCSToGCSOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_object=SOURCE_OBJECT_NO_WILDCARD,
+ destination_bucket=DESTINATION_BUCKET,
+ exact_match=True,
+ )
+
+ operator.execute(None)
+ mock_calls = [
+ mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None),
+ ]
+ mock_hook.return_value.list.assert_has_calls(mock_calls)
+
@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_prefix_and_suffix(self, mock_hook):
operator = GCSToGCSOperator(