You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/26 10:28:20 UTC
incubator-airflow git commit: [AIRFLOW-2033] Add Google Cloud Storage
List Operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 55f267492 -> f9ddb36df
[AIRFLOW-2033] Add Google Cloud Storage List Operator
Added an operator to get object names in a GCS
bucket filtered by prefix and delimiter with
example.
Closes #2974 from kaxil/gcs_list_op
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f9ddb36d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f9ddb36d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f9ddb36d
Branch: refs/heads/master
Commit: f9ddb36df1676afea22125644c241c8ba65598db
Parents: 55f2674
Author: Kaxil Naik <ka...@gmail.com>
Authored: Fri Jan 26 11:28:14 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Jan 26 11:28:14 2018 +0100
----------------------------------------------------------------------
airflow/contrib/operators/gcs_list_operator.py | 85 ++++++++++++++++++++
.../contrib/operators/test_gcs_list_operator.py | 45 +++++++++++
2 files changed, 130 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f9ddb36d/airflow/contrib/operators/gcs_list_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_list_operator.py b/airflow/contrib/operators/gcs_list_operator.py
new file mode 100644
index 0000000..e991766
--- /dev/null
+++ b/airflow/contrib/operators/gcs_list_operator.py
@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class GoogleCloudStorageListOperator(BaseOperator):
+ """
+ List all objects from the bucket with the give string prefix and delimiter in name.
+
+ This operator returns a python list with the name of objects which can be used by
+ `xcom` in the downstream task.
+
+ :param bucket: The Google cloud storage bucket to find the objects.
+ :type bucket: string
+ :param prefix: Prefix string which filters objects whose name begin with this prefix
+ :type prefix: string
+ :param delimiter: The delimiter by which you want to filter the objects.
+ For e.g to lists the CSV files from in a directory in GCS you would use
+ delimiter='.csv'.
+ :type delimiter: string
+ :param google_cloud_storage_conn_id: The connection ID to use when
+ connecting to Google cloud storage.
+ :type google_cloud_storage_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have
+ domain-wide delegation enabled.
+ :type delegate_to: string
+
+ Example: The following Operator would list all the Avro files from `sales/sales-2017`
+ folder in `data` bucket.
+
+ GCS_Files = GoogleCloudStorageListOperator(
+ task_id='GCS_Files',
+ bucket='data',
+ prefix='sales/sales-2017/',
+ delimiter='.avro',
+ google_cloud_storage_conn_id=google_cloud_conn_id
+ )
+ """
+ template_fields = ('bucket', 'prefix', 'delimiter')
+ ui_color = '#f0eee4'
+
+ @apply_defaults
+ def __init__(self,
+ bucket,
+ prefix=None,
+ delimiter=None,
+ google_cloud_storage_conn_id='google_cloud_storage_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ super(GoogleCloudStorageListOperator, self).__init__(*args, **kwargs)
+ self.bucket = bucket
+ self.prefix = prefix
+ self.delimiter = delimiter
+ self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+
+ hook = GoogleCloudStorageHook(
+ google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+ delegate_to=self.delegate_to
+ )
+
+ self.log.info('Getting list of the files. Bucket: %s; Delimiter: %s; Prefix: %s',
+ self.bucket, self.delimiter, self.prefix)
+
+ return hook.list(bucket=self.bucket,
+ prefix=self.prefix,
+ delimiter=self.delimiter)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f9ddb36d/tests/contrib/operators/test_gcs_list_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_gcs_list_operator.py b/tests/contrib/operators/test_gcs_list_operator.py
new file mode 100644
index 0000000..cc2c253
--- /dev/null
+++ b/tests/contrib/operators/test_gcs_list_operator.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+TASK_ID = 'test-gcs-list-operator'
+TEST_BUCKET = 'test-bucket'
+DELIMITER = '.csv'
+PREFIX = 'TEST'
+
+
+class GoogleCloudStorageListOperatorTest(unittest.TestCase):
+
+ @mock.patch('airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageHook')
+ def test_execute(self, mock_hook):
+ operator = GoogleCloudStorageListOperator(task_id=TASK_ID,
+ bucket=TEST_BUCKET,
+ prefix=PREFIX,
+ delimiter=DELIMITER)
+
+ operator.execute(None)
+ mock_hook.return_value.list.assert_called_once_with(
+ bucket=TEST_BUCKET, prefix=PREFIX, delimiter=DELIMITER
+ )