You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/10/25 17:34:42 UTC

incubator-airflow git commit: [AIRFLOW-588] Add Google Cloud Storage Object sensor[]

Repository: incubator-airflow
Updated Branches:
  refs/heads/master c49d0b36b -> 61370fbf7


[AIRFLOW-588] Add Google Cloud Storage Object sensor[]

The Cloud Storage sensor will check for the
existence if an object in
 a bucket. It will wait till the object exists
before continuing.

Closes #1849 from alexvanboxel/feature/airflow-588
-gcs-sensor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/61370fbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/61370fbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/61370fbf

Branch: refs/heads/master
Commit: 61370fbf7c1a7679d45b70ffa3ca438c815ad56a
Parents: c49d0b3
Author: Alex Van Boxel <al...@vanboxel.be>
Authored: Tue Oct 25 10:34:34 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue Oct 25 10:34:34 2016 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/gcs_hook.py     | 29 +++++++++++++-
 airflow/contrib/sensors/gcs_sensor.py | 64 ++++++++++++++++++++++++++++++
 2 files changed, 91 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61370fbf/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index 0bbecba..a728559 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -15,9 +15,11 @@
 
 import logging
 
-from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 from apiclient.discovery import build
 from apiclient.http import MediaFileUpload
+from googleapiclient import errors
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 
 logging.getLogger("google_cloud_storage").setLevel(logging.INFO)
 
@@ -31,7 +33,8 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
     def __init__(self,
                  google_cloud_storage_conn_id='google_cloud_storage_default',
                  delegate_to=None):
-        super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id, delegate_to)
+        super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id,
+                                                     delegate_to)
 
     def get_conn(self):
         """
@@ -84,3 +87,25 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
             .objects() \
             .insert(bucket=bucket, name=object, media_body=media) \
             .execute()
+
+    def exists(self, bucket, object):
+        """
+        Checks for the existence of a file in Google Cloud Storage.
+
+        :param bucket: The Google cloud storage bucket where the object is.
+        :type bucket: string
+        :param object: The name of the object to check in the Google cloud
+            storage bucket.
+        :type object: string
+        """
+        service = self.get_conn()
+        try:
+            service \
+                .objects() \
+                .get(bucket=bucket, object=object) \
+                .execute()
+            return True
+        except errors.HttpError as ex:
+            if ex.resp['status'] == '404':
+                return False
+            raise

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61370fbf/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
new file mode 100644
index 0000000..2d5f6af
--- /dev/null
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class GoogleCloudStorageObjectSensor(BaseSensorOperator):
+    """
+    Checks for the existence of a file in Google Cloud Storage.
+    """
+    template_fields = ('bucket', 'object')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            bucket,
+            object,
+            google_cloud_conn_id='google_cloud_storage_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new GoogleCloudStorageDownloadOperator.
+
+        :param bucket: The Google cloud storage bucket where the object is.
+        :type bucket: string
+        :param object: The name of the object to check in the Google cloud
+            storage bucket.
+        :type object: string
+        :param google_cloud_storage_conn_id: The connection ID to use when
+            connecting to Google cloud storage.
+        :type google_cloud_storage_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have domain-wide delegation enabled.
+        :type delegate_to: string
+        """
+        super(GoogleCloudStorageObjectSensor, self).__init__(*args, **kwargs)
+        self.bucket = bucket
+        self.object = object
+        self.google_cloud_conn_id = google_cloud_conn_id
+        self.delegate_to = delegate_to
+
+    def poke(self, context):
+        logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+        hook = GoogleCloudStorageHook(
+            google_cloud_storage_conn_id=self.google_cloud_conn_id,
+            delegate_to=self.delegate_to)
+        return hook.exists(self.bucket, self.object)