You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/07/25 20:55:11 UTC
[airflow] branch main updated: Fix GCStoGCS operator with replace
diabled and existing destination object (#16991)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 966b250 Fix GCStoGCS operator with replace diabled and existing destination object (#16991)
966b250 is described below
commit 966b2501995279b7b5f2e1d0bf1c63a511dd382e
Author: aslantar <lo...@uni-muenster.de>
AuthorDate: Sun Jul 25 22:54:46 2021 +0200
Fix GCStoGCS operator with replace diabled and existing destination object (#16991)
---
airflow/providers/google/cloud/transfers/gcs_to_gcs.py | 14 +++++++++++++-
.../google/cloud/transfers/test_gcs_to_gcs.py | 18 ++++++++++++++++++
2 files changed, 31 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index 9ab4370..7656ad8 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -354,7 +354,19 @@ class GCSToGCSOperator(BaseOperator):
# and only keep those files which are present in
# Source GCS bucket and not in Destination GCS bucket
- existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter)
+ if self.destination_object is None:
+ existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter)
+ else:
+ self.log.info("Replaced destination_object with source_object prefix.")
+ destination_objects = hook.list(
+ self.destination_bucket,
+ prefix=self.destination_object,
+ delimiter=delimiter,
+ )
+ existing_objects = [
+ dest_object.replace(self.destination_object, prefix_, 1)
+ for dest_object in destination_objects
+ ]
objects = set(objects) - set(existing_objects)
if len(objects) > 0:
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index 0f9bbba..dc9039d 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -530,3 +530,21 @@ class TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase):
mock.call(TEST_BUCKET, 'test_object/file3.json', DESTINATION_BUCKET, 'test_object/file3.json'),
]
mock_hook.return_value.rewrite.assert_has_calls(mock_calls_none)
+
+ @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
+ def test_execute_wildcard_with_replace_flag_false_with_destination_object(self, mock_hook):
+ operator = GCSToGCSOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_object=SOURCE_OBJECT_WILDCARD_SUFFIX,
+ destination_bucket=DESTINATION_BUCKET,
+ destination_object=DESTINATION_OBJECT_PREFIX,
+ replace=False,
+ )
+
+ operator.execute(None)
+ mock_calls = [
+ mock.call(TEST_BUCKET, prefix="test_object", delimiter=""),
+ mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter=""),
+ ]
+ mock_hook.return_value.list.assert_has_calls(mock_calls)