You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/03/04 22:58:06 UTC

[airflow] branch main updated: Add "BOOLEAN" to type_map of MSSQLToGCSOperator, fix incorrect bit->int type conversion by specifying BIT fields explicitly (#29902)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 035ad26d79 Add "BOOLEAN" to type_map of MSSQLToGCSOperator, fix incorrect bit->int type conversion by specifying BIT fields explicitly (#29902)
035ad26d79 is described below

commit 035ad26d79848c63049307a94c04a9a3916d8a38
Author: Shahar Epstein <60...@users.noreply.github.com>
AuthorDate: Sun Mar 5 00:57:55 2023 +0200

    Add "BOOLEAN" to type_map of MSSQLToGCSOperator, fix incorrect bit->int type conversion by specifying BIT fields explicitly (#29902)
---
 .../google/cloud/transfers/mssql_to_gcs.py         | 20 ++++++++++--
 .../google/cloud/transfers/test_mssql_to_gcs.py    | 37 ++++++++++++++++++----
 2 files changed, 48 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
index 0c12c01f4a..152145a8cb 100644
--- a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 
 import datetime
 import decimal
+from typing import Sequence
 
 from airflow.providers.google.cloud.transfers.sql_to_gcs import BaseSQLToGCSOperator
 from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
@@ -29,6 +30,10 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
     """Copy data from Microsoft SQL Server to Google Cloud Storage
     in JSON, CSV or Parquet format.
 
+    :param bit_fields: Sequence of fields names of MSSQL "BIT" data type,
+        to be interpreted in the schema as "BOOLEAN". "BIT" fields that won't
+        be included in this sequence, will be interpreted as "INTEGER" by
+        default.
     :param mssql_conn_id: Reference to a specific MSSQL hook.
 
     **Example**:
@@ -39,6 +44,7 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
             export_customers = MsSqlToGoogleCloudStorageOperator(
                 task_id='export_customers',
                 sql='SELECT * FROM dbo.Customers;',
+                bit_fields=['some_bit_field', 'another_bit_field'],
                 bucket='mssql-export',
                 filename='data/customers/export.json',
                 schema_filename='schemas/export.json',
@@ -55,11 +61,18 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
 
     ui_color = "#e0a98c"
 
-    type_map = {3: "INTEGER", 4: "TIMESTAMP", 5: "NUMERIC"}
+    type_map = {2: "BOOLEAN", 3: "INTEGER", 4: "TIMESTAMP", 5: "NUMERIC"}
 
-    def __init__(self, *, mssql_conn_id="mssql_default", **kwargs):
+    def __init__(
+        self,
+        *,
+        bit_fields: Sequence[str] | None = None,
+        mssql_conn_id="mssql_default",
+        **kwargs,
+    ):
         super().__init__(**kwargs)
         self.mssql_conn_id = mssql_conn_id
+        self.bit_fields = bit_fields if bit_fields else []
 
     def query(self):
         """
@@ -74,6 +87,9 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
         return cursor
 
     def field_to_bigquery(self, field) -> dict[str, str]:
+        if field[0] in self.bit_fields:
+            field = (field[0], 2)
+
         return {
             "name": field[0].replace(" ", "_"),
             "type": self.type_map.get(field[1], "STRING"),
diff --git a/tests/providers/google/cloud/transfers/test_mssql_to_gcs.py b/tests/providers/google/cloud/transfers/test_mssql_to_gcs.py
index 71b626b60f..935dcde305 100644
--- a/tests/providers/google/cloud/transfers/test_mssql_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_mssql_to_gcs.py
@@ -35,20 +35,35 @@ BUCKET = "gs://test"
 JSON_FILENAME = "test_{}.ndjson"
 GZIP = False
 
-ROWS = [("mock_row_content_1", 42), ("mock_row_content_2", 43), ("mock_row_content_3", 44)]
+ROWS = [
+    ("mock_row_content_1", 42, True, True),
+    ("mock_row_content_2", 43, False, False),
+    ("mock_row_content_3", 44, True, True),
+]
 CURSOR_DESCRIPTION = (
     ("some_str", 0, None, None, None, None, None),
     ("some_num", 3, None, None, None, None, None),
+    ("some_binary", 2, None, None, None, None, None),
+    ("some_bit", 3, None, None, None, None, None),
 )
 NDJSON_LINES = [
-    b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
-    b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
-    b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
+    b'{"some_binary": true, "some_bit": true, "some_num": 42, "some_str": "mock_row_content_1"}\n',
+    b'{"some_binary": false, "some_bit": false, "some_num": 43, "some_str": "mock_row_content_2"}\n',
+    b'{"some_binary": true, "some_bit": true, "some_num": 44, "some_str": "mock_row_content_3"}\n',
 ]
 SCHEMA_FILENAME = "schema_test.json"
 SCHEMA_JSON = [
     b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ',
-    b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}]',
+    b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, ',
+    b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOLEAN"}, ',
+    b'{"mode": "NULLABLE", "name": "some_bit", "type": "BOOLEAN"}]',
+]
+
+SCHEMA_JSON_BIT_FIELDS = [
+    b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ',
+    b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, ',
+    b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOLEAN"}, ',
+    b'{"mode": "NULLABLE", "name": "some_bit", "type": "INTEGER"}]',
 ]
 
 
@@ -148,7 +163,10 @@ class TestMsSqlToGoogleCloudStorageOperator:
 
     @mock.patch("airflow.providers.google.cloud.transfers.mssql_to_gcs.MsSqlHook")
     @mock.patch("airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook")
-    def test_schema_file(self, gcs_hook_mock_class, mssql_hook_mock_class):
+    @pytest.mark.parametrize(
+        "bit_fields,schema_json", [(None, SCHEMA_JSON), (["bit_fields", SCHEMA_JSON_BIT_FIELDS])]
+    )
+    def test_schema_file(self, gcs_hook_mock_class, mssql_hook_mock_class, bit_fields, schema_json):
         """Test writing schema files."""
         mssql_hook_mock = mssql_hook_mock_class.return_value
         mssql_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
@@ -164,7 +182,12 @@ class TestMsSqlToGoogleCloudStorageOperator:
         gcs_hook_mock.upload.side_effect = _assert_upload
 
         op = MSSQLToGCSOperator(
-            task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME, schema_filename=SCHEMA_FILENAME
+            task_id=TASK_ID,
+            sql=SQL,
+            bucket=BUCKET,
+            filename=JSON_FILENAME,
+            schema_filename=SCHEMA_FILENAME,
+            bit_fields=["some_bit"],
         )
         op.execute(None)