You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by al...@apache.org on 2017/01/03 19:24:19 UTC

incubator-airflow git commit: [AIRFLOW-717] Add Cloud Storage updated sensor

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f939d7832 -> 9df4789a9


[AIRFLOW-717] Add Cloud Storage updated sensor

Add a Cloud Storage sensor that triggers when a
object is created
or updated after a specific date. Allow setting a
callback that
defines the update requirements. The default is
execution_date
+ trigger_interval.

Closes #1959 from
alexvanboxel/feature/gcp_sensor_updated


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9df4789a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9df4789a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9df4789a

Branch: refs/heads/master
Commit: 9df4789a9c30ba4ddfc94a01feb2db61743f9334
Parents: f939d78
Author: Alex Van Boxel <al...@vanboxel.be>
Authored: Tue Jan 3 20:24:07 2017 +0100
Committer: Alex Van Boxel <al...@vanboxel.be>
Committed: Tue Jan 3 20:24:07 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/gcs_hook.py     | 43 +++++++++++++++++++
 airflow/contrib/sensors/gcs_sensor.py | 66 +++++++++++++++++++++++++++++-
 2 files changed, 107 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9df4789a/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index a728559..dd3cd27 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -43,6 +43,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
         http_authorized = self._authorize()
         return build('storage', 'v1', http=http_authorized)
 
+    # pylint:disable=redefined-builtin
     def download(self, bucket, object, filename=False):
         """
         Get a file from Google Cloud Storage.
@@ -68,6 +69,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
 
         return downloaded_file_bytes
 
+    # pylint:disable=redefined-builtin
     def upload(self, bucket, object, filename, mime_type='application/octet-stream'):
         """
         Uploads a local file to Google Cloud Storage.
@@ -88,6 +90,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
             .insert(bucket=bucket, name=object, media_body=media) \
             .execute()
 
+    # pylint:disable=redefined-builtin
     def exists(self, bucket, object):
         """
         Checks for the existence of a file in Google Cloud Storage.
@@ -109,3 +112,43 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
             if ex.resp['status'] == '404':
                 return False
             raise
+
+    # pylint:disable=redefined-builtin
+    def is_updated_after(self, bucket, object, ts):
+        """
+        Checks if an object is updated in Google Cloud Storage.
+
+        :param bucket: The Google cloud storage bucket where the object is.
+        :type bucket: string
+        :param object: The name of the object to check in the Google cloud
+            storage bucket.
+        :type object: string
+        :param ts: The timestamp to check against.
+        :type ts: datetime
+        """
+        service = self.get_conn()
+        try:
+            response = (service
+                        .objects()
+                        .get(bucket=bucket, object=object)
+                        .execute())
+
+            if 'updated' in response:
+                import dateutil.parser
+                import dateutil.tz
+
+                if not ts.tzinfo:
+                    ts = ts.replace(tzinfo=dateutil.tz.tzutc())
+
+                updated = dateutil.parser.parse(response['updated'])
+                logging.log(logging.INFO, "Verify object date: " + str(updated)
+                            + " > " + str(ts))
+
+                if updated > ts:
+                    return True
+
+        except errors.HttpError as ex:
+            if ex.resp['status'] != '404':
+                raise
+
+        return False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9df4789a/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index 2d5f6af..c9d741b 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -30,13 +30,13 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
     def __init__(
             self,
             bucket,
-            object,
+            object,  # pylint:disable=redefined-builtin
             google_cloud_conn_id='google_cloud_storage_default',
             delegate_to=None,
             *args,
             **kwargs):
         """
-        Create a new GoogleCloudStorageDownloadOperator.
+        Create a new GoogleCloudStorageObjectSensor.
 
         :param bucket: The Google cloud storage bucket where the object is.
         :type bucket: string
@@ -62,3 +62,65 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
             google_cloud_storage_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to)
         return hook.exists(self.bucket, self.object)
+
+
+def ts_function(context):
+    """
+    Default callback for the GoogleCloudStorageObjectUpdatedSensor. The default
+    behaviour is check for the object being updated after execution_date +
+    schedule_interval.
+    """
+    return context['execution_date'] + context['dag'].schedule_interval
+
+
+class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
+    """
+    Checks if an object is updated in Google Cloud Storage.
+    """
+    template_fields = ('bucket', 'object')
+    template_ext = ('.sql',)
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            bucket,
+            object,  # pylint:disable=redefined-builtin
+            ts_func=ts_function,
+            google_cloud_conn_id='google_cloud_storage_default',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new GoogleCloudStorageObjectUpdatedSensor.
+
+        :param bucket: The Google cloud storage bucket where the object is.
+        :type bucket: string
+        :param object: The name of the object to download in the Google cloud
+            storage bucket.
+        :type object: string
+        :param ts_func: Callback for defining the update condition. The default callback
+            returns execution_date + schedule_interval. The callback takes the context
+            as parameter.
+        :type ts_func: function
+        :param google_cloud_storage_conn_id: The connection ID to use when
+            connecting to Google cloud storage.
+        :type google_cloud_storage_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have domain-wide
+            delegation enabled.
+        :type delegate_to: string
+        """
+        super(GoogleCloudStorageObjectUpdatedSensor, self).__init__(*args, **kwargs)
+        self.bucket = bucket
+        self.object = object
+        self.ts_func = ts_func
+        self.google_cloud_conn_id = google_cloud_conn_id
+        self.delegate_to = delegate_to
+
+    def poke(self, context):
+        logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+        hook = GoogleCloudStorageHook(
+            google_cloud_storage_conn_id=self.google_cloud_conn_id,
+            delegate_to=self.delegate_to)
+        return hook.is_updated_after(self.bucket, self.object, self.ts_func(context))