You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/31 04:21:51 UTC

[airflow] branch main updated: Typecast biquery job response col value (#27236)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1447158e69 Typecast biquery job response col value (#27236)
1447158e69 is described below

commit 1447158e690f3d63981b3d8ec065665ec91ca54e
Author: Pankaj Singh <98...@users.noreply.github.com>
AuthorDate: Mon Oct 31 09:51:43 2022 +0530

    Typecast biquery job response col value (#27236)
---
 airflow/providers/google/cloud/hooks/bigquery.py   |  4 ++-
 .../providers/google/cloud/hooks/test_bigquery.py  | 30 ++++++++++++++++++++--
 .../google/cloud/triggers/test_bigquery.py         | 11 +++++---
 3 files changed, 39 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py
index 0618967eb3..c835ead045 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -3065,8 +3065,10 @@ class BigQueryAsyncHook(GoogleBaseAsyncHook):
         buffer = []
         if "rows" in query_results and query_results["rows"]:
             rows = query_results["rows"]
+            fields = query_results["schema"]["fields"]
+            col_types = [field["type"] for field in fields]
             for dict_row in rows:
-                typed_row = [vs["v"] for vs in dict_row["f"]]
+                typed_row = [_bq_cast(vs["v"], col_types[idx]) for idx, vs in enumerate(dict_row["f"])]
                 buffer.append(typed_row)
         return buffer
 
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py
index f10d8de087..15f815b6a6 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -170,7 +170,6 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
                 r"\['ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION'\]"
             ),
         ):
-
             self.hook.run_load(
                 "test.test",
                 "test_schema.json",
@@ -185,7 +184,6 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
             match="schema_update_options is only allowed if"
             " write_disposition is 'WRITE_APPEND' or 'WRITE_TRUNCATE'.",
         ):
-
             self.hook.run_load(
                 "test.test",
                 "test_schema.json",
@@ -2277,3 +2275,31 @@ class TestBigQueryAsyncHookMethods(_BigQueryBaseAsyncTestClass):
             dataset=DATASET_ID, project_id=PROJECT_ID, table_id=TABLE_ID, session=mock_session
         )
         assert isinstance(result, Table_async)
+
+    def test_get_records_return_type(self):
+        query_result = {
+            "kind": "bigquery#getQueryResultsResponse",
+            "etag": "test_etag",
+            "schema": {
+                "fields": [
+                    {"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"},
+                    {"name": "f1_", "type": "FLOAT", "mode": "NULLABLE"},
+                    {"name": "f2_", "type": "STRING", "mode": "NULLABLE"},
+                ]
+            },
+            "jobReference": {
+                "projectId": "test_airflow-providers",
+                "jobId": "test_jobid",
+                "location": "US",
+            },
+            "totalRows": "1",
+            "rows": [{"f": [{"v": "22"}, {"v": "3.14"}, {"v": "PI"}]}],
+            "totalBytesProcessed": "0",
+            "jobComplete": True,
+            "cacheHit": False,
+        }
+        hook = BigQueryAsyncHook()
+        result = hook.get_records(query_result)
+        assert isinstance(result[0][0], int)
+        assert isinstance(result[0][1], float)
+        assert isinstance(result[0][2], str)
diff --git a/tests/providers/google/cloud/triggers/test_bigquery.py b/tests/providers/google/cloud/triggers/test_bigquery.py
index 2b72366aad..d8609807c0 100644
--- a/tests/providers/google/cloud/triggers/test_bigquery.py
+++ b/tests/providers/google/cloud/triggers/test_bigquery.py
@@ -425,7 +425,7 @@ async def test_bigquery_check_op_trigger_success_with_data(mock_job_output, mock
     generator = trigger.run()
     actual = await generator.asend(None)
 
-    assert TriggerEvent({"status": "success", "records": ["22"]}) == actual
+    assert TriggerEvent({"status": "success", "records": [22]}) == actual
 
 
 @pytest.mark.asyncio
@@ -507,7 +507,12 @@ async def test_bigquery_get_data_trigger_success_with_data(mock_job_output, mock
     mock_job_output.return_value = {
         "kind": "bigquery#tableDataList",
         "etag": "test_etag",
-        "schema": {"fields": [{"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"}]},
+        "schema": {
+            "fields": [
+                {"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"},
+                {"name": "f1_", "type": "STRING", "mode": "NULLABLE"},
+            ]
+        },
         "jobReference": {
             "projectId": "test-airflow-providers",
             "jobId": "test_jobid",
@@ -539,7 +544,7 @@ async def test_bigquery_get_data_trigger_success_with_data(mock_job_output, mock
             {
                 "status": "success",
                 "message": "success",
-                "records": [["42", "monthy python"], ["42", "fishy fish"]],
+                "records": [[42, "monthy python"], [42, "fishy fish"]],
             }
         )
         == actual