You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/11/27 19:35:12 UTC
incubator-airflow git commit: [AIRFLOW-1843] Add Google Cloud Storage
Sensor with prefix
Repository: incubator-airflow
Updated Branches:
refs/heads/master eff68882b -> d8115e982
[AIRFLOW-1843] Add Google Cloud Storage Sensor with prefix
Sensor for checking if there any files in bucket
at certain prefix
Closes #2809 from litdeviant/gcs_prefix_sensor
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8115e98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8115e98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8115e98
Branch: refs/heads/master
Commit: d8115e982b19e39fb96362c1acb3bf1715bc9180
Parents: eff6888
Author: Igors Vaitkus <li...@protonmail.com>
Authored: Mon Nov 27 11:35:01 2017 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Nov 27 11:35:01 2017 -0800
----------------------------------------------------------------------
airflow/contrib/sensors/gcs_sensor.py | 45 ++++++++++++++++++++++++++++++
1 file changed, 45 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8115e98/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index 384e26f..a45923a 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -121,3 +121,48 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
return hook.is_updated_after(self.bucket, self.object, self.ts_func(context))
+
+
+class GoogleCloudStoragePrefixSensor(BaseSensorOperator):
+ """
+ Checks for the existence of a files at prefix in Google Cloud Storage bucket.
+ """
+ template_fields = ('bucket', 'prefix')
+ ui_color = '#f0eee4'
+
+ @apply_defaults
+ def __init__(
+ self,
+ bucket,
+ prefix,
+ google_cloud_conn_id='google_cloud_storage_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ """
+ Create a new GoogleCloudStorageObjectSensor.
+
+ :param bucket: The Google cloud storage bucket where the object is.
+ :type bucket: string
+ :param prefix: The name of the prefix to check in the Google cloud
+ storage bucket.
+ :type prefix: string
+ :param google_cloud_storage_conn_id: The connection ID to use when
+ connecting to Google cloud storage.
+ :type google_cloud_storage_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide delegation enabled.
+ :type delegate_to: string
+ """
+ super(GoogleCloudStoragePrefixSensor, self).__init__(*args, **kwargs)
+ self.bucket = bucket
+ self.prefix = prefix
+ self.google_cloud_conn_id = google_cloud_conn_id
+ self.delegate_to = delegate_to
+
+ def poke(self, context):
+ self.log.info('Sensor checks existence of objects: %s, %s', self.bucket, self.prefix)
+ hook = GoogleCloudStorageHook(
+ google_cloud_storage_conn_id=self.google_cloud_conn_id,
+ delegate_to=self.delegate_to)
+ return bool(hook.list(self.bucket, prefix=self.prefix))