You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/06/12 21:07:21 UTC

[airflow] branch main updated: Add BigQueryToMsSqlOperator (#15422)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f8f75e  Add BigQueryToMsSqlOperator (#15422)
7f8f75e is described below

commit 7f8f75eb80790d4be3167f5e1ffccc669a281d55
Author: Kanthi <su...@gmail.com>
AuthorDate: Sat Jun 12 17:07:06 2021 -0400

    Add BigQueryToMsSqlOperator (#15422)
---
 .../example_dags/example_bigquery_to_mssql.py      |  69 +++++++++
 .../google/cloud/transfers/bigquery_to_mssql.py    | 163 +++++++++++++++++++++
 airflow/providers/google/provider.yaml             |   3 +
 .../cloud/transfers/test_bigquery_to_mssql.py      |  51 +++++++
 4 files changed, 286 insertions(+)

diff --git a/airflow/providers/google/cloud/example_dags/example_bigquery_to_mssql.py b/airflow/providers/google/cloud/example_dags/example_bigquery_to_mssql.py
new file mode 100644
index 0000000..24754cb
--- /dev/null
+++ b/airflow/providers/google/cloud/example_dags/example_bigquery_to_mssql.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for Google BigQuery service.
+"""
+import os
+
+from airflow import models
+from airflow.providers.google.cloud.operators.bigquery import (
+    BigQueryCreateEmptyDatasetOperator,
+    BigQueryCreateEmptyTableOperator,
+    BigQueryDeleteDatasetOperator,
+)
+from airflow.providers.google.cloud.transfers.bigquery_to_mssql import BigQueryToMsSqlOperator
+from airflow.utils.dates import days_ago
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_transfer")
+DATA_EXPORT_BUCKET_NAME = os.environ.get("GCP_BIGQUERY_EXPORT_BUCKET_NAME", "INVALID BUCKET NAME")
+TABLE = "table_42"
+destination_table = "mssql_table_test"
+
+with models.DAG(
+    "example_bigquery_to_mssql",
+    schedule_interval=None,  # Override to match your needs
+    start_date=days_ago(1),
+    tags=["example"],
+) as dag:
+    bigquery_to_mssql = BigQueryToMsSqlOperator(
+        task_id="bigquery_to_mssql",
+        source_project_dataset_table=f'{DATASET_NAME}.{TABLE}',
+        mssql_table=destination_table,
+        replace=False,
+    )
+
+    create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
+
+    create_table = BigQueryCreateEmptyTableOperator(
+        task_id="create_table",
+        dataset_id=DATASET_NAME,
+        table_id=TABLE,
+        schema_fields=[
+            {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
+            {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
+        ],
+    )
+    create_dataset >> create_table >> bigquery_to_mssql
+
+    delete_dataset = BigQueryDeleteDatasetOperator(
+        task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
+    )
+
+    bigquery_to_mssql >> delete_dataset
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py b/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
new file mode 100644
index 0000000..b8ea166
--- /dev/null
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
@@ -0,0 +1,163 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google BigQuery to MSSQL operator."""
+from typing import Optional, Sequence, Union
+
+from google.cloud.bigquery.table import TableReference
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
+from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
+from airflow.utils.decorators import apply_defaults
+
+
+class BigQueryToMsSqlOperator(BaseOperator):
+    """
+    Fetches the data from a BigQuery table (alternatively fetch data for selected columns)
+    and insert that data into a MSSQL table.
+
+
+    .. note::
+        If you pass fields to ``selected_fields`` which are in different order than the
+        order of columns already in
+        BQ table, the data will still be in the order of BQ table.
+        For example if the BQ table has 3 columns as
+        ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
+        the data would still be of the form ``'A,B'`` and passed through this form
+        to MSSQL
+
+    **Example**: ::
+
+       transfer_data = BigQueryToMsSqlOperator(
+            task_id='task_id',
+            dataset_table='origin_bq_table',
+            mssql_table='dest_table_name',
+            replace=True,
+        )
+
+    :param dataset_table: A dotted ``<dataset>.<table>``: the big query table of origin
+    :type dataset_table: str
+    :param selected_fields: List of fields to return (comma-separated). If
+        unspecified, all fields are returned.
+    :type selected_fields: str
+    :param gcp_conn_id: reference to a specific Google Cloud hook.
+    :type gcp_conn_id: str
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :type delegate_to: str
+    :param mssql_conn_id: reference to a specific mssql hook
+    :type mssql_conn_id: str
+    :param database: name of database which overwrite defined one in connection
+    :type database: str
+    :param replace: Whether to replace instead of insert
+    :type replace: bool
+    :param batch_size: The number of rows to take in each batch
+    :type batch_size: int
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'dataset_id',
+        'table_id',
+        'mssql_table',
+        'impersonation_chain',
+    )
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,  # pylint: disable=too-many-arguments
+        source_project_dataset_table: str,
+        mssql_table: str,
+        selected_fields: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        mssql_conn_id: str = 'mssql_default',
+        database: Optional[str] = None,
+        delegate_to: Optional[str] = None,
+        replace: bool = False,
+        batch_size: int = 1000,
+        location: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.selected_fields = selected_fields
+        self.gcp_conn_id = gcp_conn_id
+        self.mssql_conn_id = mssql_conn_id
+        self.database = database
+        self.mssql_table = mssql_table
+        self.replace = replace
+        self.delegate_to = delegate_to
+        self.batch_size = batch_size
+        self.location = location
+        self.impersonation_chain = impersonation_chain
+        self.source_project_dataset_table = source_project_dataset_table
+
+    def _bq_get_data(self):
+
+        hook = BigQueryHook(
+            bigquery_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            location=self.location,
+            impersonation_chain=self.impersonation_chain,
+        )
+        table_ref = TableReference.from_string(self.source_project_dataset_table)
+        self.log.info('Fetching Data from:')
+        self.log.info('Dataset: %s, Table: %s', table_ref.dataset_id, table_ref.table_id)
+
+        conn = hook.get_conn()
+        cursor = conn.cursor()
+        i = 0
+        while True:
+            response = cursor.get_tabledata(
+                dataset_id=table_ref.dataset_id,
+                table_id=table_ref.table_id,
+                max_results=self.batch_size,
+                selected_fields=self.selected_fields,
+                start_index=i * self.batch_size,
+            )
+
+            if 'rows' not in response:
+                self.log.info('Job Finished')
+                return
+
+            rows = response['rows']
+
+            self.log.info('Total Extracted rows: %s', len(rows) + i * self.batch_size)
+
+            table_data = []
+            table_data = [[fields['v'] for fields in dict_row['f']] for dict_row in rows]
+
+            yield table_data
+            i += 1
+
+    def execute(self, context):
+        mssql_hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id, schema=self.database)
+        for rows in self._bq_get_data():
+            mssql_hook.insert_rows(self.mssql_table, rows, replace=self.replace)
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index fc195ba..443701a 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -675,6 +675,9 @@ transfers:
   - source-integration-name: Google BigQuery
     target-integration-name: MySQL
     python-module: airflow.providers.google.cloud.transfers.bigquery_to_mysql
+  - source-integration-name: Google BigQuery
+    target-integration-name: Microsoft SQL Server (MSSQL)
+    python-module: airflow.providers.google.cloud.transfers.bigquery_to_mssql
   - source-integration-name: Google Cloud Storage (GCS)
     target-integration-name: Google BigQuery
     python-module: airflow.providers.google.cloud.transfers.gcs_to_bigquery
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py b/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
new file mode 100644
index 0000000..01a54bb
--- /dev/null
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+from airflow.providers.google.cloud.transfers.bigquery_to_mssql import BigQueryToMsSqlOperator
+
+TASK_ID = 'test-bq-create-table-operator'
+TEST_PROJECT_ID = 'test-project'
+TEST_DATASET = 'test-dataset'
+TEST_TABLE_ID = 'test-table-id'
+TEST_DAG_ID = 'test-bigquery-operators'
+
+
+class TestBigQueryToMsSqlOperator(unittest.TestCase):
+    @mock.patch('airflow.providers.google.cloud.transfers.bigquery_to_mssql.BigQueryHook')
+    def test_execute_good_request_to_bq(self, mock_hook):
+        destination_table = 'table'
+        operator = BigQueryToMsSqlOperator(
+            task_id=TASK_ID,
+            source_project_dataset_table=f'{TEST_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}',
+            mssql_table=destination_table,
+            replace=False,
+        )
+
+        operator.execute(None)
+        # fmt: off
+        mock_hook.return_value.get_conn.return_value.cursor.return_value.get_tabledata\
+            .assert_called_once_with(
+                dataset_id=TEST_DATASET,
+                table_id=TEST_TABLE_ID,
+                max_results=1000,
+                selected_fields=None,
+                start_index=0,
+            )
+        # fmt: on