You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2018/01/03 20:48:10 UTC

incubator-airflow git commit: [AIRFLOW-1946][AIRFLOW-1855] Create a BigQuery Get Data Operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master cc9295fe3 -> 07c2a515e


[AIRFLOW-1946][AIRFLOW-1855] Create a BigQuery Get Data Operator

Closes #2896 from kaxil/patch-4


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/07c2a515
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/07c2a515
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/07c2a515

Branch: refs/heads/master
Commit: 07c2a515efd86169191642df17167dbef90d2d74
Parents: cc9295f
Author: Kaxil Naik <ka...@gmail.com>
Authored: Wed Jan 3 12:48:01 2018 -0800
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Jan 3 12:48:01 2018 -0800

----------------------------------------------------------------------
 airflow/contrib/operators/bigquery_get_data.py | 112 ++++++++++++++++++++
 docs/code.rst                                  |   3 +
 2 files changed, 115 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07c2a515/airflow/contrib/operators/bigquery_get_data.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py
new file mode 100644
index 0000000..b3b25f2
--- /dev/null
+++ b/airflow/contrib/operators/bigquery_get_data.py
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.bigquery_hook import BigQueryHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BigQueryGetDataOperator(BaseOperator):
+    """
+    Fetches the data from a BigQuery table (alternatively fetch data for selected columns)
+    and returns data in a python list. The number of elements in the returned list will
+    be equal to the number of rows fetched. Each element in the list will again be a list
+    where element would represent the columns values for that row.
+
+    Example Result: [['Tony', '10'], ['Mike', '20'], ['Steve', '15']]
+
+    Note: If you pass fields to `selected_fields` which are in different order than the
+          order of columns already in
+          BQ table, the data will still be in the order of BQ table.
+          For example if the BQ table has 3 columns as
+          [A,B,C] and you pass 'B,A' in the `selected_fields`
+          the data would still be of the form 'A,B'.
+
+    Example:
+
+    get_data = BigQueryGetDataOperator(
+        task_id='get_data_from_bq',
+        dataset_id='test_dataset',
+        table_id='Transaction_partitions',
+        max_results='100',
+        # selected_fields='DATE',
+        bigquery_conn_id='airflow-service-account'
+    )
+
+    :param dataset_id: The dataset ID of the requested table.
+    :type destination_dataset_table: string
+    :param table_id: The table ID of the requested table.
+    :type table_id: string
+    :param max_results: The maximum number of records (rows) to be fetched
+        from the table.
+    :type max_results: string
+    :param selected_fields: List of fields to return (comma-separated). If
+        unspecified, all fields are returned.
+    :type selected_fields: string
+    :param bigquery_conn_id: reference to a specific BigQuery hook.
+    :type bigquery_conn_id: string
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: string
+    """
+    template_fields = ('dataset_id', 'table_id', 'max_results')
+    ui_color = '#e4f0e8'
+
+    @apply_defaults
+    def __init__(self,
+                 dataset_id,
+                 table_id,
+                 max_results='100',
+                 selected_fields=None,
+                 bigquery_conn_id='bigquery_default',
+                 delegate_to=None,
+                 *args,
+                 **kwargs):
+        super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
+        self.dataset_id = dataset_id
+        self.table_id = table_id
+        self.max_results = max_results
+        self.selected_fields = selected_fields
+        self.bigquery_conn_id = bigquery_conn_id
+        self.delegate_to = delegate_to
+
+    def execute(self, context):
+        logging.info('Fetching Data from:')
+        logging.info('Dataset: %s ; Table: %s ; Max Results: %s',
+                     self.dataset_id, self.table_id, self.max_results)
+
+        hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
+                            delegate_to=self.delegate_to)
+
+        conn = hook.get_conn()
+        cursor = conn.cursor()
+        response = cursor.get_tabledata(dataset_id=self.dataset_id,
+                                        table_id=self.table_id,
+                                        max_results=self.max_results,
+                                        selected_fields=self.selected_fields)
+
+        logging.info('Total Extracted rows: %s', response['totalRows'])
+        rows = response['rows']
+
+        table_data = []
+        for dict_row in rows:
+            single_row = []
+            for fields in dict_row['f']:
+                single_row.append(fields['v'])
+            table_data.append(single_row)
+
+        return table_data

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07c2a515/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index e7b7a51..045e5a4 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -92,12 +92,15 @@ Community-contributed Operators
  Use :code:`from airflow.operators.bash_operator import BashOperator` instead.
 
 .. autoclass:: airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor
+.. autoclass:: airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator
 .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
 .. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
 .. autoclass:: airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator
 .. autoclass:: airflow.contrib.operators.ecs_operator.ECSOperator
 .. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator
+.. autoclass:: airflow.contrib.operators.gcs_copy_operator.GoogleCloudStorageCopyOperator
 .. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator
+.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator
 .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator
 .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator
 .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubSubscriptionCreateOperator