You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by al...@apache.org on 2017/01/03 19:24:19 UTC
incubator-airflow git commit: [AIRFLOW-717] Add Cloud Storage updated
sensor
Repository: incubator-airflow
Updated Branches:
refs/heads/master f939d7832 -> 9df4789a9
[AIRFLOW-717] Add Cloud Storage updated sensor
Add a Cloud Storage sensor that triggers when a
object is created
or updated after a specific date. Allow setting a
callback that
defines the update requirements. The default is
execution_date
+ trigger_interval.
Closes #1959 from
alexvanboxel/feature/gcp_sensor_updated
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9df4789a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9df4789a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9df4789a
Branch: refs/heads/master
Commit: 9df4789a9c30ba4ddfc94a01feb2db61743f9334
Parents: f939d78
Author: Alex Van Boxel <al...@vanboxel.be>
Authored: Tue Jan 3 20:24:07 2017 +0100
Committer: Alex Van Boxel <al...@vanboxel.be>
Committed: Tue Jan 3 20:24:07 2017 +0100
----------------------------------------------------------------------
airflow/contrib/hooks/gcs_hook.py | 43 +++++++++++++++++++
airflow/contrib/sensors/gcs_sensor.py | 66 +++++++++++++++++++++++++++++-
2 files changed, 107 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9df4789a/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index a728559..dd3cd27 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -43,6 +43,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
http_authorized = self._authorize()
return build('storage', 'v1', http=http_authorized)
+ # pylint:disable=redefined-builtin
def download(self, bucket, object, filename=False):
"""
Get a file from Google Cloud Storage.
@@ -68,6 +69,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
return downloaded_file_bytes
+ # pylint:disable=redefined-builtin
def upload(self, bucket, object, filename, mime_type='application/octet-stream'):
"""
Uploads a local file to Google Cloud Storage.
@@ -88,6 +90,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
.insert(bucket=bucket, name=object, media_body=media) \
.execute()
+ # pylint:disable=redefined-builtin
def exists(self, bucket, object):
"""
Checks for the existence of a file in Google Cloud Storage.
@@ -109,3 +112,43 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
if ex.resp['status'] == '404':
return False
raise
+
+ # pylint:disable=redefined-builtin
+ def is_updated_after(self, bucket, object, ts):
+ """
+ Checks if an object is updated in Google Cloud Storage.
+
+ :param bucket: The Google cloud storage bucket where the object is.
+ :type bucket: string
+ :param object: The name of the object to check in the Google cloud
+ storage bucket.
+ :type object: string
+ :param ts: The timestamp to check against.
+ :type ts: datetime
+ """
+ service = self.get_conn()
+ try:
+ response = (service
+ .objects()
+ .get(bucket=bucket, object=object)
+ .execute())
+
+ if 'updated' in response:
+ import dateutil.parser
+ import dateutil.tz
+
+ if not ts.tzinfo:
+ ts = ts.replace(tzinfo=dateutil.tz.tzutc())
+
+ updated = dateutil.parser.parse(response['updated'])
+ logging.log(logging.INFO, "Verify object date: " + str(updated)
+ + " > " + str(ts))
+
+ if updated > ts:
+ return True
+
+ except errors.HttpError as ex:
+ if ex.resp['status'] != '404':
+ raise
+
+ return False
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9df4789a/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index 2d5f6af..c9d741b 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -30,13 +30,13 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
def __init__(
self,
bucket,
- object,
+ object, # pylint:disable=redefined-builtin
google_cloud_conn_id='google_cloud_storage_default',
delegate_to=None,
*args,
**kwargs):
"""
- Create a new GoogleCloudStorageDownloadOperator.
+ Create a new GoogleCloudStorageObjectSensor.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
@@ -62,3 +62,65 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
return hook.exists(self.bucket, self.object)
+
+
+def ts_function(context):
+ """
+ Default callback for the GoogleCloudStorageObjectUpdatedSensor. The default
+ behaviour is check for the object being updated after execution_date +
+ schedule_interval.
+ """
+ return context['execution_date'] + context['dag'].schedule_interval
+
+
+class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
+ """
+ Checks if an object is updated in Google Cloud Storage.
+ """
+ template_fields = ('bucket', 'object')
+ template_ext = ('.sql',)
+ ui_color = '#f0eee4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ bucket,
+ object, # pylint:disable=redefined-builtin
+ ts_func=ts_function,
+ google_cloud_conn_id='google_cloud_storage_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new GoogleCloudStorageObjectUpdatedSensor.
+
+ :param bucket: The Google cloud storage bucket where the object is.
+ :type bucket: string
+ :param object: The name of the object to download in the Google cloud
+ storage bucket.
+ :type object: string
+ :param ts_func: Callback for defining the update condition. The default callback
+ returns execution_date + schedule_interval. The callback takes the context
+ as parameter.
+ :type ts_func: function
+ :param google_cloud_storage_conn_id: The connection ID to use when
+ connecting to Google cloud storage.
+ :type google_cloud_storage_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+ super(GoogleCloudStorageObjectUpdatedSensor, self).__init__(*args, **kwargs)
+ self.bucket = bucket
+ self.object = object
+ self.ts_func = ts_func
+ self.google_cloud_conn_id = google_cloud_conn_id
+ self.delegate_to = delegate_to
+
+ def poke(self, context):
+ logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+ hook = GoogleCloudStorageHook(
+ google_cloud_storage_conn_id=self.google_cloud_conn_id,
+ delegate_to=self.delegate_to)
+ return hook.is_updated_after(self.bucket, self.object, self.ts_func(context))