You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by al...@apache.org on 2017/01/09 20:46:26 UTC

incubator-airflow git commit: [AIRFLOW-728] Add Google BigQuery table sensor

Repository: incubator-airflow
Updated Branches:
  refs/heads/master e010cb29b -> 617ba7412


[AIRFLOW-728] Add Google BigQuery table sensor

Design a sensor that checks whether a certain
table is present in bigquery. The sensor will
accept the google cloud project id, bigquery
dataset id and bigquery table id to check as
parameters.

Internally, it will use the bigquery hook to check
for the existence of the table.
Therefore a 'table_exists' method will be added to
the existing Bigquery hook.

Closes #1970 from bodschut/feature/bq_sensor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/617ba741
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/617ba741
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/617ba741

Branch: refs/heads/master
Commit: 617ba741205ddea8461fc287267fc9c371ace2de
Parents: e010cb2
Author: Bob De Schutter <de...@gmail.com>
Authored: Mon Jan 9 21:46:16 2017 +0100
Committer: Alex Van Boxel <al...@vanboxel.be>
Committed: Mon Jan 9 21:46:16 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py     | 27 ++++++++++
 airflow/contrib/sensors/bigquery_sensor.py | 69 +++++++++++++++++++++++++
 2 files changed, 96 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/617ba741/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index d796565..53ca123 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -22,6 +22,7 @@ import logging
 import time
 
 from apiclient.discovery import build, HttpError
+from googleapiclient import errors
 from builtins import range
 from pandas.io.gbq import GbqConnector, \
     _parse_data as gbq_parse_data, \
@@ -100,6 +101,32 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook):
         else:
             return gbq_parse_data(schema, [])
 
+    def table_exists(self, project_id, dataset_id, table_id):
+        """
+        Checks for the existence of a table in Google BigQuery.
+
+        :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook
+        must provide access to the specified project.
+        :type project_id: string
+        :param dataset_id: The name of the dataset in which to look for the table.
+            storage bucket.
+        :type dataset_id: string
+        :param table_id: The name of the table to check the existence of.
+        :type table_id: string
+        """
+        service = self.get_service()
+        try:
+            service.tables().get(
+                projectId=project_id,
+                datasetId=dataset_id,
+                tableId=table_id
+            ).execute()
+            return True
+        except errors.HttpError as e:
+            if e.resp['status'] == '404':
+                return False
+            raise
+
 
 class BigQueryPandasConnector(GbqConnector):
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/617ba741/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
new file mode 100644
index 0000000..8a8ca62
--- /dev/null
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.contrib.hooks.bigquery_hook import BigQueryHook
+from airflow.utils.decorators import apply_defaults
+
+
+class BigQueryTableSensor(BaseSensorOperator):
+    """
+    Checks for the existence of a table in Google Bigquery.
+    """
+    template_fields = ('project_id', 'dataset_id', 'table_id',)
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(
+            self,
+            project_id,
+            dataset_id,
+            table_id,
+            bigquery_conn_id='bigquery_default_conn',
+            delegate_to=None,
+            *args,
+            **kwargs):
+        """
+        Create a new BigQueryTableSensor.
+
+        :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook
+        must provide access to the specified project.
+        :type project_id: string
+        :param dataset_id: The name of the dataset in which to look for the table.
+            storage bucket.
+        :type dataset_id: string
+        :param table_id: The name of the table to check the existence of.
+        :type table_id: string
+        :param bigquery_conn_id: The connection ID to use when connecting to Google BigQuery.
+        :type bigquery_conn_id: string
+        :param delegate_to: The account to impersonate, if any.
+            For this to work, the service account making the request must have domain-wide delegation enabled.
+        :type delegate_to: string
+        """
+        super(BigQueryTableSensor, self).__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.dataset_id = dataset_id
+        self.table_id = table_id
+        self.bigquery_conn_id = bigquery_conn_id
+        self.delegate_to = delegate_to
+
+    def poke(self, context):
+        table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
+        logging.info('Sensor checks existence of table: %s', table_uri)
+        hook = BigQueryHook(
+            bigquery_conn_id=self.bigquery_conn_id,
+            delegate_to=self.delegate_to)
+        return hook.table_exists(self.project_id, self.dataset_id, self.table_id)