You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/11/27 19:35:12 UTC

incubator-airflow git commit: [AIRFLOW-1843] Add Google Cloud Storage Sensor with prefix

Repository: incubator-airflow
Updated Branches:
  refs/heads/master eff68882b -> d8115e982


[AIRFLOW-1843] Add Google Cloud Storage Sensor with prefix

Sensor for checking if there any files in bucket
at certain prefix

Closes #2809 from litdeviant/gcs_prefix_sensor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8115e98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8115e98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8115e98

Branch: refs/heads/master
Commit: d8115e982b19e39fb96362c1acb3bf1715bc9180
Parents: eff6888
Author: Igors Vaitkus <li...@protonmail.com>
Authored: Mon Nov 27 11:35:01 2017 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Nov 27 11:35:01 2017 -0800

----------------------------------------------------------------------
 airflow/contrib/sensors/gcs_sensor.py | 45 ++++++++++++++++++++++++++++++
 1 file changed, 45 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8115e98/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index 384e26f..a45923a 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -121,3 +121,48 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
             google_cloud_storage_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to)
         return hook.is_updated_after(self.bucket, self.object, self.ts_func(context))
+
+
+class GoogleCloudStoragePrefixSensor(BaseSensorOperator):
+    """
+    Checks for the existence of a files at prefix in Google Cloud Storage bucket.
+    """
+    template_fields = ('bucket', 'prefix')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(
+        self,
+        bucket,
+        prefix,
+        google_cloud_conn_id='google_cloud_storage_default',
+        delegate_to=None,
+        *args,
+        **kwargs):
+        """
+        Create a new GoogleCloudStorageObjectSensor.
+
+        :param bucket: The Google cloud storage bucket where the object is.
+        :type bucket: string
+        :param prefix: The name of the prefix to check in the Google cloud
+            storage bucket.
+        :type prefix: string
+        :param google_cloud_storage_conn_id: The connection ID to use when
+            connecting to Google cloud storage.
+        :type google_cloud_storage_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have domain-wide delegation enabled.
+        :type delegate_to: string
+        """
+        super(GoogleCloudStoragePrefixSensor, self).__init__(*args, **kwargs)
+        self.bucket = bucket
+        self.prefix = prefix
+        self.google_cloud_conn_id = google_cloud_conn_id
+        self.delegate_to = delegate_to
+
+    def poke(self, context):
+        self.log.info('Sensor checks existence of objects: %s, %s', self.bucket, self.prefix)
+        hook = GoogleCloudStorageHook(
+            google_cloud_storage_conn_id=self.google_cloud_conn_id,
+            delegate_to=self.delegate_to)
+        return bool(hook.list(self.bucket, prefix=self.prefix))