You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/06/23 19:40:09 UTC

incubator-airflow git commit: [AIRFLOW-1333] Enable copy function for Google Cloud Storage Hook

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4841e3ead -> e2c3831a4


[AIRFLOW-1333] Enable copy function for Google Cloud Storage Hook

Closes #2385 from yk5/gcs_hook_copy


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e2c3831a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e2c3831a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e2c3831a

Branch: refs/heads/master
Commit: e2c3831a482c131ca404efbbefcf92b177614ff4
Parents: 4841e3e
Author: Younghee Kwon <yo...@google.com>
Authored: Fri Jun 23 12:40:02 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Jun 23 12:40:02 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/gcs_hook.py | 48 ++++++++++++++++++++++++++++++++++
 1 file changed, 48 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e2c3831a/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index d38ceff..b5f3edc 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -43,6 +43,52 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
         http_authorized = self._authorize()
         return build('storage', 'v1', http=http_authorized)
 
+
+    # pylint:disable=redefined-builtin
+    def copy(self, source_bucket, source_object, destination_bucket=None,
+             destination_object=None):
+        """
+        Copies an object from a bucket to another, with renaming if requested.
+
+        destination_bucket or destination_object can be omitted, in which case
+        source bucket/object is used, but not both.
+
+        :param bucket: The bucket of the object to copy from.
+        :type bucket: string
+        :param object: The object to copy.
+        :type object: string
+        :param destination_bucket: The destination of the object to copied to.
+            Can be omitted; then the same bucket is used.
+        :type destination_bucket: string
+        :param destination_object: The (renamed) path of the object if given.
+            Can be omitted; then the same name is used.
+        """
+        destination_bucket = destination_bucket or source_bucket
+        destination_object = destination_object or source_object
+        if (source_bucket == destination_bucket and
+            source_object == destination_object):
+            raise ValueError(
+                'Either source/destination bucket or source/destination object '
+                'must be different, not both the same: bucket=%s, object=%s' %
+                (source_bucket, source_object))
+        if not source_bucket or not source_object:
+            raise ValueError('source_bucket and source_object cannot be empty.')
+
+        service = self.get_conn()
+        try:
+            service \
+                .objects() \
+                .copy(sourceBucket=source_bucket, sourceObject=source_object,
+                      destinationBucket=destination_bucket,
+                      destinationObject=destination_object, body='') \
+                .execute()
+            return True
+        except errors.HttpError as ex:
+            if ex.resp['status'] == '404':
+                return False
+            raise
+
+
     # pylint:disable=redefined-builtin
     def download(self, bucket, object, filename=False):
         """
@@ -157,6 +203,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
         """
         Delete an object if versioning is not enabled for the bucket, or if generation
         parameter is used.
+
         :param bucket: name of the bucket, where the object resides
         :type bucket: string
         :param object: name of the object to delete
@@ -181,6 +228,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
     def list(self, bucket, versions=None, maxResults=None, prefix=None):
         """
         List all objects from the bucket with the give string prefix in name
+
         :param bucket: bucket name
         :type bucket: string
         :param versions: if true, list all versions of the objects