You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/03/04 22:58:06 UTC
[airflow] branch main updated: Add "BOOLEAN" to type_map of MSSQLToGCSOperator, fix incorrect bit->int type conversion by specifying BIT fields explicitly (#29902)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 035ad26d79 Add "BOOLEAN" to type_map of MSSQLToGCSOperator, fix incorrect bit->int type conversion by specifying BIT fields explicitly (#29902)
035ad26d79 is described below
commit 035ad26d79848c63049307a94c04a9a3916d8a38
Author: Shahar Epstein <60...@users.noreply.github.com>
AuthorDate: Sun Mar 5 00:57:55 2023 +0200
Add "BOOLEAN" to type_map of MSSQLToGCSOperator, fix incorrect bit->int type conversion by specifying BIT fields explicitly (#29902)
---
.../google/cloud/transfers/mssql_to_gcs.py | 20 ++++++++++--
.../google/cloud/transfers/test_mssql_to_gcs.py | 37 ++++++++++++++++++----
2 files changed, 48 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
index 0c12c01f4a..152145a8cb 100644
--- a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import datetime
import decimal
+from typing import Sequence
from airflow.providers.google.cloud.transfers.sql_to_gcs import BaseSQLToGCSOperator
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
@@ -29,6 +30,10 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
"""Copy data from Microsoft SQL Server to Google Cloud Storage
in JSON, CSV or Parquet format.
+ :param bit_fields: Sequence of fields names of MSSQL "BIT" data type,
+ to be interpreted in the schema as "BOOLEAN". "BIT" fields that won't
+ be included in this sequence, will be interpreted as "INTEGER" by
+ default.
:param mssql_conn_id: Reference to a specific MSSQL hook.
**Example**:
@@ -39,6 +44,7 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
export_customers = MsSqlToGoogleCloudStorageOperator(
task_id='export_customers',
sql='SELECT * FROM dbo.Customers;',
+ bit_fields=['some_bit_field', 'another_bit_field'],
bucket='mssql-export',
filename='data/customers/export.json',
schema_filename='schemas/export.json',
@@ -55,11 +61,18 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
ui_color = "#e0a98c"
- type_map = {3: "INTEGER", 4: "TIMESTAMP", 5: "NUMERIC"}
+ type_map = {2: "BOOLEAN", 3: "INTEGER", 4: "TIMESTAMP", 5: "NUMERIC"}
- def __init__(self, *, mssql_conn_id="mssql_default", **kwargs):
+ def __init__(
+ self,
+ *,
+ bit_fields: Sequence[str] | None = None,
+ mssql_conn_id="mssql_default",
+ **kwargs,
+ ):
super().__init__(**kwargs)
self.mssql_conn_id = mssql_conn_id
+ self.bit_fields = bit_fields if bit_fields else []
def query(self):
"""
@@ -74,6 +87,9 @@ class MSSQLToGCSOperator(BaseSQLToGCSOperator):
return cursor
def field_to_bigquery(self, field) -> dict[str, str]:
+ if field[0] in self.bit_fields:
+ field = (field[0], 2)
+
return {
"name": field[0].replace(" ", "_"),
"type": self.type_map.get(field[1], "STRING"),
diff --git a/tests/providers/google/cloud/transfers/test_mssql_to_gcs.py b/tests/providers/google/cloud/transfers/test_mssql_to_gcs.py
index 71b626b60f..935dcde305 100644
--- a/tests/providers/google/cloud/transfers/test_mssql_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_mssql_to_gcs.py
@@ -35,20 +35,35 @@ BUCKET = "gs://test"
JSON_FILENAME = "test_{}.ndjson"
GZIP = False
-ROWS = [("mock_row_content_1", 42), ("mock_row_content_2", 43), ("mock_row_content_3", 44)]
+ROWS = [
+ ("mock_row_content_1", 42, True, True),
+ ("mock_row_content_2", 43, False, False),
+ ("mock_row_content_3", 44, True, True),
+]
CURSOR_DESCRIPTION = (
("some_str", 0, None, None, None, None, None),
("some_num", 3, None, None, None, None, None),
+ ("some_binary", 2, None, None, None, None, None),
+ ("some_bit", 3, None, None, None, None, None),
)
NDJSON_LINES = [
- b'{"some_num": 42, "some_str": "mock_row_content_1"}\n',
- b'{"some_num": 43, "some_str": "mock_row_content_2"}\n',
- b'{"some_num": 44, "some_str": "mock_row_content_3"}\n',
+ b'{"some_binary": true, "some_bit": true, "some_num": 42, "some_str": "mock_row_content_1"}\n',
+ b'{"some_binary": false, "some_bit": false, "some_num": 43, "some_str": "mock_row_content_2"}\n',
+ b'{"some_binary": true, "some_bit": true, "some_num": 44, "some_str": "mock_row_content_3"}\n',
]
SCHEMA_FILENAME = "schema_test.json"
SCHEMA_JSON = [
b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ',
- b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}]',
+ b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, ',
+ b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOLEAN"}, ',
+ b'{"mode": "NULLABLE", "name": "some_bit", "type": "BOOLEAN"}]',
+]
+
+SCHEMA_JSON_BIT_FIELDS = [
+ b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, ',
+ b'{"mode": "NULLABLE", "name": "some_num", "type": "INTEGER"}, ',
+ b'{"mode": "NULLABLE", "name": "some_binary", "type": "BOOLEAN"}, ',
+ b'{"mode": "NULLABLE", "name": "some_bit", "type": "INTEGER"}]',
]
@@ -148,7 +163,10 @@ class TestMsSqlToGoogleCloudStorageOperator:
@mock.patch("airflow.providers.google.cloud.transfers.mssql_to_gcs.MsSqlHook")
@mock.patch("airflow.providers.google.cloud.transfers.sql_to_gcs.GCSHook")
- def test_schema_file(self, gcs_hook_mock_class, mssql_hook_mock_class):
+ @pytest.mark.parametrize(
+ "bit_fields,schema_json", [(None, SCHEMA_JSON), (["bit_fields", SCHEMA_JSON_BIT_FIELDS])]
+ )
+ def test_schema_file(self, gcs_hook_mock_class, mssql_hook_mock_class, bit_fields, schema_json):
"""Test writing schema files."""
mssql_hook_mock = mssql_hook_mock_class.return_value
mssql_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS)
@@ -164,7 +182,12 @@ class TestMsSqlToGoogleCloudStorageOperator:
gcs_hook_mock.upload.side_effect = _assert_upload
op = MSSQLToGCSOperator(
- task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=JSON_FILENAME, schema_filename=SCHEMA_FILENAME
+ task_id=TASK_ID,
+ sql=SQL,
+ bucket=BUCKET,
+ filename=JSON_FILENAME,
+ schema_filename=SCHEMA_FILENAME,
+ bit_fields=["some_bit"],
)
op.execute(None)