You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/10/25 17:34:42 UTC
incubator-airflow git commit: [AIRFLOW-588] Add Google Cloud Storage
Object sensor[]
Repository: incubator-airflow
Updated Branches:
refs/heads/master c49d0b36b -> 61370fbf7
[AIRFLOW-588] Add Google Cloud Storage Object sensor[]
The Cloud Storage sensor will check for the
existence if an object in
a bucket. It will wait till the object exists
before continuing.
Closes #1849 from alexvanboxel/feature/airflow-588
-gcs-sensor
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/61370fbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/61370fbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/61370fbf
Branch: refs/heads/master
Commit: 61370fbf7c1a7679d45b70ffa3ca438c815ad56a
Parents: c49d0b3
Author: Alex Van Boxel <al...@vanboxel.be>
Authored: Tue Oct 25 10:34:34 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Tue Oct 25 10:34:34 2016 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/gcs_hook.py | 29 +++++++++++++-
airflow/contrib/sensors/gcs_sensor.py | 64 ++++++++++++++++++++++++++++++
2 files changed, 91 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61370fbf/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index 0bbecba..a728559 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -15,9 +15,11 @@
import logging
-from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from apiclient.discovery import build
from apiclient.http import MediaFileUpload
+from googleapiclient import errors
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
logging.getLogger("google_cloud_storage").setLevel(logging.INFO)
@@ -31,7 +33,8 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
def __init__(self,
google_cloud_storage_conn_id='google_cloud_storage_default',
delegate_to=None):
- super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id, delegate_to)
+ super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id,
+ delegate_to)
def get_conn(self):
"""
@@ -84,3 +87,25 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
.objects() \
.insert(bucket=bucket, name=object, media_body=media) \
.execute()
+
+ def exists(self, bucket, object):
+ """
+ Checks for the existence of a file in Google Cloud Storage.
+
+ :param bucket: The Google cloud storage bucket where the object is.
+ :type bucket: string
+ :param object: The name of the object to check in the Google cloud
+ storage bucket.
+ :type object: string
+ """
+ service = self.get_conn()
+ try:
+ service \
+ .objects() \
+ .get(bucket=bucket, object=object) \
+ .execute()
+ return True
+ except errors.HttpError as ex:
+ if ex.resp['status'] == '404':
+ return False
+ raise
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/61370fbf/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
new file mode 100644
index 0000000..2d5f6af
--- /dev/null
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class GoogleCloudStorageObjectSensor(BaseSensorOperator):
+ """
+ Checks for the existence of a file in Google Cloud Storage.
+ """
+ template_fields = ('bucket', 'object')
+ ui_color = '#f0eee4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ bucket,
+ object,
+ google_cloud_conn_id='google_cloud_storage_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new GoogleCloudStorageDownloadOperator.
+
+ :param bucket: The Google cloud storage bucket where the object is.
+ :type bucket: string
+ :param object: The name of the object to check in the Google cloud
+ storage bucket.
+ :type object: string
+ :param google_cloud_storage_conn_id: The connection ID to use when
+ connecting to Google cloud storage.
+ :type google_cloud_storage_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide delegation enabled.
+ :type delegate_to: string
+ """
+ super(GoogleCloudStorageObjectSensor, self).__init__(*args, **kwargs)
+ self.bucket = bucket
+ self.object = object
+ self.google_cloud_conn_id = google_cloud_conn_id
+ self.delegate_to = delegate_to
+
+ def poke(self, context):
+ logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+ hook = GoogleCloudStorageHook(
+ google_cloud_storage_conn_id=self.google_cloud_conn_id,
+ delegate_to=self.delegate_to)
+ return hook.exists(self.bucket, self.object)