You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/09/05 09:21:59 UTC

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #6011: [AIRFLOW-2842] Add GoogleCloudStorageSynchronizeBuckets operator

nuclearpinguin commented on a change in pull request #6011: [AIRFLOW-2842] Add GoogleCloudStorageSynchronizeBuckets operator
URL: https://github.com/apache/airflow/pull/6011#discussion_r321152667
 
 

 ##########
 File path: airflow/contrib/hooks/gcs_hook.py
 ##########
 @@ -564,6 +564,167 @@ def compose(self, bucket_name, source_objects, destination_object):
 
         self.log.info("Completed successfully.")
 
+    def sync(
+        self,
+        source_bucket: str,
+        destination_bucket: str,
+        source_object: Optional[str] = None,
+        destination_object: Optional[str] = None,
+        recursive: bool = True,
+        allow_overwrite: bool = False,
+        delete_extra_files: bool = False
+    ):
+        """
+        Synchronizes the contents of the buckets.
+
+        Parameters ``source_object`` and ``destination_object`` describe the root sync directory. If they are
+        not passed, the entire bucket will be synchronized. They should point to directories.
+
+        .. note::
+            The synchronization of individual files is not supported. Only entire directories can be
+            synchronized.
+
+        :param source_bucket: The name of the bucket containing the source objects.
+        :type source_bucket: str
+        :param destination_bucket: The name of the bucket containing the destination objects.
+        :type destination_bucket: str
+        :param source_object: The root sync directory in the source bucket.
+        :type source_object: Optional[str]
+        :param destination_object: The root sync directory in the destination bucket.
+        :type destination_object: Optional[str]
+        :param recursive: If True, subdirectories will be considered
+        :type recursive: bool
+        :param recursive: If True, subdirectories will be considered
+        :type recursive: bool
+        :param allow_overwrite: if True, the files will be overwritten if a mismatched file is found.
+            By default, overwriting files is not allowed
+        :type allow_overwrite: bool
+        :param delete_extra_files: if True, deletes additional files from the source that not found in the
+            destination. By default extra files are not deleted.
+
+            .. note::
+                This option can delete data quickly if you specify the wrong source/destination combination.
+
+        :type delete_extra_files: bool
+        :return: none
+        """
+        client = self.get_conn()
+        # Create bucket object
+        source_bucket_obj = client.bucket(source_bucket)
+        destination_bucket_obj = client.bucket(destination_bucket)
+        # Normalize parameters when they are passed
+        source_object = (
+            source_object + "/" if source_object and not source_object.endswith("/") else source_object
+        )
+        destination_object = (
+            destination_object + "/"
+            if destination_object and not destination_object.endswith("/")
+            else destination_object
+        )
+        # Calculate the number of characters that remove from the name, because they contain information
+        # about the parent's path
+        source_object_prefix_len = len(source_object) if source_object else 0
+        # Prepare synchronization plan
+        to_copy_blobs, to_delete_blobs, to_rewrite_blobs = self._prepare_sync_plan(
+            source_bucket=source_bucket_obj,
+            destination_bucket=destination_bucket_obj,
+            source_object=source_object,
+            destination_object=destination_object,
+            recursive=recursive
+        )
+        self.log.info(
+            "Planned synchronization. To delete blobs count: %s, to upload blobs count: %s, "
+            "to rewrite blobs count: %s",
+            len(to_delete_blobs),
+            len(to_copy_blobs),
+            len(to_rewrite_blobs),
+        )
+
+        # Copy missing object to new bucket
+        if to_copy_blobs:
+            for blob in to_copy_blobs:
+                dst_object = (
+                    path.join(destination_object, blob.name[source_object_prefix_len:])
+                    if destination_object
+                    else blob.name[source_object_prefix_len:]
+                )
+                self.copy(
+                    source_bucket=source_bucket_obj.name,
+                    source_object=blob.name,
+                    destination_bucket=destination_bucket_obj.name,
+                    destination_object=dst_object,
+                )
+            self.log.info("Blobs copied.")
+        # Delete redundant files
+        if to_delete_blobs:
+            if delete_extra_files:
+                # TODO: Add batch. I tried to do it, but the Google library is not stable at the moment.
+                for blob in to_delete_blobs:
+                    self.delete(blob.bucket.name, blob.name)
+                self.log.info("Blobs deleted.")
+            else:
+                self.log.info("Skipped blobs deleting")
+
+        # Overwrite files that are different
+        if to_rewrite_blobs:
+            if allow_overwrite:
+                for blob in to_rewrite_blobs:
+                    dst_object = (
+                        path.join(destination_object, blob.name[source_object_prefix_len:])
+                        if destination_object
+                        else blob.name[source_object_prefix_len:]
+                    )
+                    self.rewrite(
+                        source_bucket=source_bucket_obj.name,
+                        source_object=blob.name,
+                        destination_bucket=destination_bucket_obj.name,
+                        destination_object=dst_object,
+                    )
+                self.log.info("Blob rewritten.")
 
 Review comment:
   ```suggestion
                   self.log.info("Blobs rewritten.")
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services