You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/01/26 10:28:20 UTC

incubator-airflow git commit: [AIRFLOW-2033] Add Google Cloud Storage List Operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 55f267492 -> f9ddb36df


[AIRFLOW-2033] Add Google Cloud Storage List Operator

Added an operator to get object names in a GCS
bucket filtered by prefix and delimiter with
example.

Closes #2974 from kaxil/gcs_list_op


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f9ddb36d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f9ddb36d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f9ddb36d

Branch: refs/heads/master
Commit: f9ddb36df1676afea22125644c241c8ba65598db
Parents: 55f2674
Author: Kaxil Naik <ka...@gmail.com>
Authored: Fri Jan 26 11:28:14 2018 +0100
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Fri Jan 26 11:28:14 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/gcs_list_operator.py  | 85 ++++++++++++++++++++
 .../contrib/operators/test_gcs_list_operator.py | 45 +++++++++++
 2 files changed, 130 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f9ddb36d/airflow/contrib/operators/gcs_list_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_list_operator.py b/airflow/contrib/operators/gcs_list_operator.py
new file mode 100644
index 0000000..e991766
--- /dev/null
+++ b/airflow/contrib/operators/gcs_list_operator.py
@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class GoogleCloudStorageListOperator(BaseOperator):
+    """
+    List all objects from the bucket with the give string prefix and delimiter in name.
+
+    This operator returns a python list with the name of objects which can be used by
+     `xcom` in the downstream task.
+
+    :param bucket: The Google cloud storage bucket to find the objects.
+    :type bucket: string
+    :param prefix: Prefix string which filters objects whose name begin with this prefix
+    :type prefix: string
+    :param delimiter: The delimiter by which you want to filter the objects.
+        For e.g to lists the CSV files from in a directory in GCS you would use
+        delimiter='.csv'.
+    :type delimiter: string
+    :param google_cloud_storage_conn_id: The connection ID to use when
+        connecting to Google cloud storage.
+    :type google_cloud_storage_conn_id: string
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: string
+
+    Example: The following Operator would list all the Avro files from `sales/sales-2017`
+        folder in `data` bucket.
+
+    GCS_Files = GoogleCloudStorageListOperator(
+        task_id='GCS_Files',
+        bucket='data',
+        prefix='sales/sales-2017/',
+        delimiter='.avro',
+        google_cloud_storage_conn_id=google_cloud_conn_id
+    )
+    """
+    template_fields = ('bucket', 'prefix', 'delimiter')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 bucket,
+                 prefix=None,
+                 delimiter=None,
+                 google_cloud_storage_conn_id='google_cloud_storage_default',
+                 delegate_to=None,
+                 *args,
+                 **kwargs):
+        super(GoogleCloudStorageListOperator, self).__init__(*args, **kwargs)
+        self.bucket = bucket
+        self.prefix = prefix
+        self.delimiter = delimiter
+        self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+
+        hook = GoogleCloudStorageHook(
+            google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+            delegate_to=self.delegate_to
+        )
+
+        self.log.info('Getting list of the files. Bucket: %s; Delimiter: %s; Prefix: %s',
+                      self.bucket, self.delimiter, self.prefix)
+
+        return hook.list(bucket=self.bucket,
+                         prefix=self.prefix,
+                         delimiter=self.delimiter)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f9ddb36d/tests/contrib/operators/test_gcs_list_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_gcs_list_operator.py b/tests/contrib/operators/test_gcs_list_operator.py
new file mode 100644
index 0000000..cc2c253
--- /dev/null
+++ b/tests/contrib/operators/test_gcs_list_operator.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+TASK_ID = 'test-gcs-list-operator'
+TEST_BUCKET = 'test-bucket'
+DELIMITER = '.csv'
+PREFIX = 'TEST'
+
+
+class GoogleCloudStorageListOperatorTest(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageHook')
+    def test_execute(self, mock_hook):
+        operator = GoogleCloudStorageListOperator(task_id=TASK_ID,
+                                                  bucket=TEST_BUCKET,
+                                                  prefix=PREFIX,
+                                                  delimiter=DELIMITER)
+
+        operator.execute(None)
+        mock_hook.return_value.list.assert_called_once_with(
+            bucket=TEST_BUCKET, prefix=PREFIX, delimiter=DELIMITER
+        )