You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/31 04:21:51 UTC
[airflow] branch main updated: Typecast biquery job response col value (#27236)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1447158e69 Typecast biquery job response col value (#27236)
1447158e69 is described below
commit 1447158e690f3d63981b3d8ec065665ec91ca54e
Author: Pankaj Singh <98...@users.noreply.github.com>
AuthorDate: Mon Oct 31 09:51:43 2022 +0530
Typecast biquery job response col value (#27236)
---
airflow/providers/google/cloud/hooks/bigquery.py | 4 ++-
.../providers/google/cloud/hooks/test_bigquery.py | 30 ++++++++++++++++++++--
.../google/cloud/triggers/test_bigquery.py | 11 +++++---
3 files changed, 39 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py
index 0618967eb3..c835ead045 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -3065,8 +3065,10 @@ class BigQueryAsyncHook(GoogleBaseAsyncHook):
buffer = []
if "rows" in query_results and query_results["rows"]:
rows = query_results["rows"]
+ fields = query_results["schema"]["fields"]
+ col_types = [field["type"] for field in fields]
for dict_row in rows:
- typed_row = [vs["v"] for vs in dict_row["f"]]
+ typed_row = [_bq_cast(vs["v"], col_types[idx]) for idx, vs in enumerate(dict_row["f"])]
buffer.append(typed_row)
return buffer
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py
index f10d8de087..15f815b6a6 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -170,7 +170,6 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
r"\['ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION'\]"
),
):
-
self.hook.run_load(
"test.test",
"test_schema.json",
@@ -185,7 +184,6 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
match="schema_update_options is only allowed if"
" write_disposition is 'WRITE_APPEND' or 'WRITE_TRUNCATE'.",
):
-
self.hook.run_load(
"test.test",
"test_schema.json",
@@ -2277,3 +2275,31 @@ class TestBigQueryAsyncHookMethods(_BigQueryBaseAsyncTestClass):
dataset=DATASET_ID, project_id=PROJECT_ID, table_id=TABLE_ID, session=mock_session
)
assert isinstance(result, Table_async)
+
+ def test_get_records_return_type(self):
+ query_result = {
+ "kind": "bigquery#getQueryResultsResponse",
+ "etag": "test_etag",
+ "schema": {
+ "fields": [
+ {"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"},
+ {"name": "f1_", "type": "FLOAT", "mode": "NULLABLE"},
+ {"name": "f2_", "type": "STRING", "mode": "NULLABLE"},
+ ]
+ },
+ "jobReference": {
+ "projectId": "test_airflow-providers",
+ "jobId": "test_jobid",
+ "location": "US",
+ },
+ "totalRows": "1",
+ "rows": [{"f": [{"v": "22"}, {"v": "3.14"}, {"v": "PI"}]}],
+ "totalBytesProcessed": "0",
+ "jobComplete": True,
+ "cacheHit": False,
+ }
+ hook = BigQueryAsyncHook()
+ result = hook.get_records(query_result)
+ assert isinstance(result[0][0], int)
+ assert isinstance(result[0][1], float)
+ assert isinstance(result[0][2], str)
diff --git a/tests/providers/google/cloud/triggers/test_bigquery.py b/tests/providers/google/cloud/triggers/test_bigquery.py
index 2b72366aad..d8609807c0 100644
--- a/tests/providers/google/cloud/triggers/test_bigquery.py
+++ b/tests/providers/google/cloud/triggers/test_bigquery.py
@@ -425,7 +425,7 @@ async def test_bigquery_check_op_trigger_success_with_data(mock_job_output, mock
generator = trigger.run()
actual = await generator.asend(None)
- assert TriggerEvent({"status": "success", "records": ["22"]}) == actual
+ assert TriggerEvent({"status": "success", "records": [22]}) == actual
@pytest.mark.asyncio
@@ -507,7 +507,12 @@ async def test_bigquery_get_data_trigger_success_with_data(mock_job_output, mock
mock_job_output.return_value = {
"kind": "bigquery#tableDataList",
"etag": "test_etag",
- "schema": {"fields": [{"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"}]},
+ "schema": {
+ "fields": [
+ {"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"},
+ {"name": "f1_", "type": "STRING", "mode": "NULLABLE"},
+ ]
+ },
"jobReference": {
"projectId": "test-airflow-providers",
"jobId": "test_jobid",
@@ -539,7 +544,7 @@ async def test_bigquery_get_data_trigger_success_with_data(mock_job_output, mock
{
"status": "success",
"message": "success",
- "records": [["42", "monthy python"], ["42", "fishy fish"]],
+ "records": [[42, "monthy python"], [42, "fishy fish"]],
}
)
== actual