You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2018/01/03 20:48:10 UTC
incubator-airflow git commit: [AIRFLOW-1946][AIRFLOW-1855] Create a
BigQuery Get Data Operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master cc9295fe3 -> 07c2a515e
[AIRFLOW-1946][AIRFLOW-1855] Create a BigQuery Get Data Operator
Closes #2896 from kaxil/patch-4
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/07c2a515
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/07c2a515
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/07c2a515
Branch: refs/heads/master
Commit: 07c2a515efd86169191642df17167dbef90d2d74
Parents: cc9295f
Author: Kaxil Naik <ka...@gmail.com>
Authored: Wed Jan 3 12:48:01 2018 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Jan 3 12:48:01 2018 -0800
----------------------------------------------------------------------
airflow/contrib/operators/bigquery_get_data.py | 112 ++++++++++++++++++++
docs/code.rst | 3 +
2 files changed, 115 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07c2a515/airflow/contrib/operators/bigquery_get_data.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py
new file mode 100644
index 0000000..b3b25f2
--- /dev/null
+++ b/airflow/contrib/operators/bigquery_get_data.py
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.bigquery_hook import BigQueryHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BigQueryGetDataOperator(BaseOperator):
+ """
+ Fetches the data from a BigQuery table (alternatively fetch data for selected columns)
+ and returns data in a python list. The number of elements in the returned list will
+ be equal to the number of rows fetched. Each element in the list will again be a list
+ where element would represent the columns values for that row.
+
+ Example Result: [['Tony', '10'], ['Mike', '20'], ['Steve', '15']]
+
+ Note: If you pass fields to `selected_fields` which are in different order than the
+ order of columns already in
+ BQ table, the data will still be in the order of BQ table.
+ For example if the BQ table has 3 columns as
+ [A,B,C] and you pass 'B,A' in the `selected_fields`
+ the data would still be of the form 'A,B'.
+
+ Example:
+
+ get_data = BigQueryGetDataOperator(
+ task_id='get_data_from_bq',
+ dataset_id='test_dataset',
+ table_id='Transaction_partitions',
+ max_results='100',
+ # selected_fields='DATE',
+ bigquery_conn_id='airflow-service-account'
+ )
+
+ :param dataset_id: The dataset ID of the requested table.
+ :type destination_dataset_table: string
+ :param table_id: The table ID of the requested table.
+ :type table_id: string
+ :param max_results: The maximum number of records (rows) to be fetched
+ from the table.
+ :type max_results: string
+ :param selected_fields: List of fields to return (comma-separated). If
+ unspecified, all fields are returned.
+ :type selected_fields: string
+ :param bigquery_conn_id: reference to a specific BigQuery hook.
+ :type bigquery_conn_id: string
+ :param delegate_to: The account to impersonate, if any.
+ For this to work, the service account making the request must have domain-wide
+ delegation enabled.
+ :type delegate_to: string
+ """
+ template_fields = ('dataset_id', 'table_id', 'max_results')
+ ui_color = '#e4f0e8'
+
+ @apply_defaults
+ def __init__(self,
+ dataset_id,
+ table_id,
+ max_results='100',
+ selected_fields=None,
+ bigquery_conn_id='bigquery_default',
+ delegate_to=None,
+ *args,
+ **kwargs):
+ super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
+ self.dataset_id = dataset_id
+ self.table_id = table_id
+ self.max_results = max_results
+ self.selected_fields = selected_fields
+ self.bigquery_conn_id = bigquery_conn_id
+ self.delegate_to = delegate_to
+
+ def execute(self, context):
+ logging.info('Fetching Data from:')
+ logging.info('Dataset: %s ; Table: %s ; Max Results: %s',
+ self.dataset_id, self.table_id, self.max_results)
+
+ hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
+ delegate_to=self.delegate_to)
+
+ conn = hook.get_conn()
+ cursor = conn.cursor()
+ response = cursor.get_tabledata(dataset_id=self.dataset_id,
+ table_id=self.table_id,
+ max_results=self.max_results,
+ selected_fields=self.selected_fields)
+
+ logging.info('Total Extracted rows: %s', response['totalRows'])
+ rows = response['rows']
+
+ table_data = []
+ for dict_row in rows:
+ single_row = []
+ for fields in dict_row['f']:
+ single_row.append(fields['v'])
+ table_data.append(single_row)
+
+ return table_data
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07c2a515/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index e7b7a51..045e5a4 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -92,12 +92,15 @@ Community-contributed Operators
Use :code:`from airflow.operators.bash_operator import BashOperator` instead.
.. autoclass:: airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor
+.. autoclass:: airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator
.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
.. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
.. autoclass:: airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator
.. autoclass:: airflow.contrib.operators.ecs_operator.ECSOperator
.. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator
+.. autoclass:: airflow.contrib.operators.gcs_copy_operator.GoogleCloudStorageCopyOperator
.. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator
+.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator
.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator
.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator
.. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator