You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/07/25 20:55:11 UTC

[airflow] branch main updated: Fix GCStoGCS operator with replace diabled and existing destination object (#16991)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 966b250  Fix GCStoGCS operator with replace diabled and existing destination object (#16991)
966b250 is described below

commit 966b2501995279b7b5f2e1d0bf1c63a511dd382e
Author: aslantar <lo...@uni-muenster.de>
AuthorDate: Sun Jul 25 22:54:46 2021 +0200

    Fix GCStoGCS operator with replace diabled and existing destination object (#16991)
---
 airflow/providers/google/cloud/transfers/gcs_to_gcs.py | 14 +++++++++++++-
 .../google/cloud/transfers/test_gcs_to_gcs.py          | 18 ++++++++++++++++++
 2 files changed, 31 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index 9ab4370..7656ad8 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -354,7 +354,19 @@ class GCSToGCSOperator(BaseOperator):
             # and only keep those files which are present in
             # Source GCS bucket and not in Destination GCS bucket
 
-            existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter)
+            if self.destination_object is None:
+                existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter)
+            else:
+                self.log.info("Replaced destination_object with source_object prefix.")
+                destination_objects = hook.list(
+                    self.destination_bucket,
+                    prefix=self.destination_object,
+                    delimiter=delimiter,
+                )
+                existing_objects = [
+                    dest_object.replace(self.destination_object, prefix_, 1)
+                    for dest_object in destination_objects
+                ]
 
             objects = set(objects) - set(existing_objects)
             if len(objects) > 0:
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index 0f9bbba..dc9039d 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -530,3 +530,21 @@ class TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase):
             mock.call(TEST_BUCKET, 'test_object/file3.json', DESTINATION_BUCKET, 'test_object/file3.json'),
         ]
         mock_hook.return_value.rewrite.assert_has_calls(mock_calls_none)
+
+    @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
+    def test_execute_wildcard_with_replace_flag_false_with_destination_object(self, mock_hook):
+        operator = GCSToGCSOperator(
+            task_id=TASK_ID,
+            source_bucket=TEST_BUCKET,
+            source_object=SOURCE_OBJECT_WILDCARD_SUFFIX,
+            destination_bucket=DESTINATION_BUCKET,
+            destination_object=DESTINATION_OBJECT_PREFIX,
+            replace=False,
+        )
+
+        operator.execute(None)
+        mock_calls = [
+            mock.call(TEST_BUCKET, prefix="test_object", delimiter=""),
+            mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter=""),
+        ]
+        mock_hook.return_value.list.assert_has_calls(mock_calls)