You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/12/01 23:51:57 UTC
incubator-airflow git commit: [AIRFLOW-1855][AIRFLOW-1866] Add GCS
Copy Operator to copy multiple files
Repository: incubator-airflow
Updated Branches:
refs/heads/master b9c82c040 -> 3e321790d
[AIRFLOW-1855][AIRFLOW-1866] Add GCS Copy Operator to copy multiple files
Closes #2819 from kaxil/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3e321790
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3e321790
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3e321790
Branch: refs/heads/master
Commit: 3e321790d537696b8fd1a97dcac2ab7c469fecea
Parents: b9c82c0
Author: Kaxil Naik <ka...@gmail.com>
Authored: Fri Dec 1 15:51:22 2017 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Dec 1 15:51:30 2017 -0800
----------------------------------------------------------------------
airflow/contrib/hooks/gcs_hook.py | 34 +++---
airflow/contrib/operators/gcs_copy_operator.py | 115 ++++++++++++++++++++
2 files changed, 135 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e321790/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index 546e028..f6ad39f 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -38,7 +38,6 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
http_authorized = self._authorize()
return build('storage', 'v1', http=http_authorized)
-
# pylint:disable=redefined-builtin
def copy(self, source_bucket, source_object, destination_bucket=None,
destination_object=None):
@@ -48,10 +47,10 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
destination_bucket or destination_object can be omitted, in which case
source bucket/object is used, but not both.
- :param bucket: The bucket of the object to copy from.
- :type bucket: string
- :param object: The object to copy.
- :type object: string
+ :param source_bucket: The bucket of the object to copy from.
+ :type source_bucket: string
+ :param source_object: The object to copy.
+ :type source_object: string
:param destination_bucket: The destination of the object to copied to.
Can be omitted; then the same bucket is used.
:type destination_bucket: string
@@ -219,7 +218,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
return False
raise
- def list(self, bucket, versions=None, maxResults=None, prefix=None):
+ def list(self, bucket, versions=None, maxResults=None, prefix=None, delimiter=None):
"""
List all objects from the bucket with the give string prefix in name
@@ -231,6 +230,8 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
:type maxResults: integer
:param prefix: prefix string which filters objects whose name begin with this prefix
:type prefix: string
+ :param delimiter: filters objects based on the delimiter (for e.g '.csv')
+ :type delimiter:string
:return: a stream of object names matching the filtering criteria
"""
service = self.get_conn()
@@ -243,16 +244,21 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
versions=versions,
maxResults=maxResults,
pageToken=pageToken,
- prefix=prefix
+ prefix=prefix,
+ delimiter=delimiter
).execute()
- if 'items' not in response:
- self.log.info("No items found for prefix: %s", prefix)
- break
-
- for item in response['items']:
- if item and 'name' in item:
- ids.append(item['name'])
+ if 'prefixes' not in response:
+ if 'items' not in response:
+ self.log.info("No items found for prefix: %s", prefix)
+ break
+
+ for item in response['items']:
+ if item and 'name' in item:
+ ids.append(item['name'])
+ else:
+ for item in response['prefixes']:
+ ids.append(item)
if 'nextPageToken' not in response:
# no further pages of results, so stop the loop
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e321790/airflow/contrib/operators/gcs_copy_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_copy_operator.py b/airflow/contrib/operators/gcs_copy_operator.py
new file mode 100644
index 0000000..55d98a3
--- /dev/null
+++ b/airflow/contrib/operators/gcs_copy_operator.py
@@ -0,0 +1,115 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class GoogleCloudStorageCopyOperator(BaseOperator):
+ """
+ Copies objects (optionally from a directory) filtered by 'delimiter' (file extension for e.g .json) from a bucket
+ to another bucket in a different directory, if required.
+
+ :param source_bucket: The source Google cloud storage bucket where the object is.
+ :type source_bucket: string
+ :param source_object: The source name of the object to copy in the Google cloud
+ storage bucket.
+ :type source_object: string
+ :param source_files_delimiter: The delimiter by which you want to filter the files to copy.
+ For e.g to copy the CSV files from in a directory in GCS you would use source_files_delimiter='.csv'.
+ :type source_files_delimiter: string
+ :param destination_bucket: The destination Google cloud storage bucket where the object should be.
+ :type destination_bucket: string
+ :param destination_directory: The destination name of the directory in the destination Google cloud
+ storage bucket.
+ :type destination_directory: string
+ :param google_cloud_storage_conn_id: The connection ID to use when
+ connecting to Google cloud storage.
+ :type google_cloud_storage_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide delegation enabled.
+ :type delegate_to: string
+
+ Example: The following Operator would move all the CSV files from `sales/sales-2017` folder in `data` bucket to
+ `sales` folder in `archive` bucket.
+
+ move_file = GoogleCloudStorageCopyOperator(
+ task_id='move_file',
+ source_bucket='data',
+ source_object='sales/sales-2017/',
+ source_files_delimiter='.csv'
+ destination_bucket='archive',
+ destination_directory='sales',
+ google_cloud_storage_conn_id='airflow-service-account'
+ )
+ """
+ template_fields = ('source_bucket', 'source_object', 'source_files_delimiter',
+ 'destination_bucket', 'destination_directory')
+ ui_color = '#f0eee4'
+
+ @apply_defaults
+ def __init__(self,
+ source_bucket,
+ source_object,
+ source_files_delimiter=None,
+ destination_bucket=None,
+ destination_directory='',
+ google_cloud_storage_conn_id='google_cloud_storage_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ super(GoogleCloudStorageCopyOperator, self).__init__(*args, **kwargs)
+ self.source_bucket = source_bucket
+ self.source_object = source_object
+ self.source_files_delimiter = source_files_delimiter
+ self.files_to_copy = list()
+ self.destination_bucket = destination_bucket
+ self.destination_directory = destination_directory
+ self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+
+ self.log.info('Executing copy - Source_Bucket: %s, Source_directory: %s, '
+ 'Destination_bucket: %s, Destination_directory: %s',
+ self.source_bucket, self.source_object,
+ self.destination_bucket or self.source_bucket,
+ self.destination_directory or self.source_object)
+
+ hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+ delegate_to=self.delegate_to)
+
+ self.log.info('Getting list of the files to copy. Source Bucket: %s; Source Object: %s',
+ self.source_bucket, self.source_object)
+
+ # Create a list of objects to copy from Source bucket. The function uses prefix keyword to pass the name of
+ # the object to copy.
+ self.files_to_copy = hook.list(bucket=self.source_bucket, prefix=self.source_object,
+ delimiter=self.source_files_delimiter)
+
+ # Log the names of all objects to be copied
+ self.log.info('Files to copy: %s', self.files_to_copy)
+
+ if self.files_to_copy is not None:
+ for file_to_copy in self.files_to_copy:
+ self.log.info('Source_Bucket: %s, Source_Object: %s, '
+ 'Destination_bucket: %s, Destination_Directory: %s',
+ self.source_bucket, file_to_copy,
+ self.destination_bucket or self.source_bucket,
+ self.destination_directory + file_to_copy)
+ hook.copy(self.source_bucket, file_to_copy,
+ self.destination_bucket, self.destination_directory + file_to_copy)
+ else:
+ self.log.info('No Files to copy.')