You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/01/09 08:10:23 UTC

[airflow] branch main updated: Fix BigQueryColumnCheckOperator runtime error (#28796)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c67f4af667 Fix BigQueryColumnCheckOperator runtime error (#28796)
c67f4af667 is described below

commit c67f4af667948e654585e6df102663670804819e
Author: Victor Chiapaikeo <vc...@gmail.com>
AuthorDate: Mon Jan 9 03:10:13 2023 -0500

    Fix BigQueryColumnCheckOperator runtime error (#28796)
---
 .../providers/google/cloud/operators/bigquery.py   |  2 +-
 .../google/cloud/operators/test_bigquery.py        | 63 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 5b2f0d5c34..eeb003eb69 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -611,7 +611,7 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, SQLColumnCheckOperator):
                 self.column_mapping[column][check], result, tolerance
             )
 
-        failed_tests(
+        failed_tests.extend(
             f"Column: {col}\n\tCheck: {check},\n\tCheck Values: {check_values}\n"
             for col, checks in self.column_mapping.items()
             for check, check_values in checks.items()
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py
index 94a906e20f..d699d6cedb 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -21,6 +21,7 @@ import unittest
 from unittest import mock
 from unittest.mock import MagicMock
 
+import pandas as pd
 import pytest
 from google.cloud.bigquery import DEFAULT_RETRY
 from google.cloud.exceptions import Conflict
@@ -31,6 +32,7 @@ from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.providers.google.cloud.operators.bigquery import (
     BigQueryCheckOperator,
+    BigQueryColumnCheckOperator,
     BigQueryConsoleIndexableLink,
     BigQueryConsoleLink,
     BigQueryCreateEmptyDatasetOperator,
@@ -1676,3 +1678,64 @@ def test_bigquery_value_check_empty():
     with pytest.raises(AirflowException) as missing_param:
         BigQueryValueCheckOperator(deferrable=True, kwargs={})
     assert (missing_param.value.args[0] == expected) or (missing_param.value.args[0] == expected1)
+
+
+@pytest.mark.parametrize(
+    "check_type, check_value, check_result",
+    [
+        ("equal_to", 0, 0),
+        ("greater_than", 0, 1),
+        ("less_than", 0, -1),
+        ("geq_to", 0, 1),
+        ("geq_to", 0, 0),
+        ("leq_to", 0, 0),
+        ("leq_to", 0, -1),
+    ],
+)
+@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+def test_bigquery_column_check_operator_succeeds(mock_job, mock_hook, check_type, check_value, check_result):
+    mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+        {"col_name": ["col1"], "check_type": ["min"], "check_result": [check_result]}
+    )
+    mock_hook.return_value.insert_job.return_value = mock_job
+
+    op = BigQueryColumnCheckOperator(
+        task_id="check_column_succeeds",
+        table=TEST_TABLE_ID,
+        use_legacy_sql=False,
+        column_mapping={
+            "col1": {"min": {check_type: check_value}},
+        },
+    )
+    op.execute(create_context(op))
+
+
+@pytest.mark.parametrize(
+    "check_type, check_value, check_result",
+    [
+        ("equal_to", 0, 1),
+        ("greater_than", 0, -1),
+        ("less_than", 0, 1),
+        ("geq_to", 0, -1),
+        ("leq_to", 0, 1),
+    ],
+)
+@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+def test_bigquery_column_check_operator_fails(mock_job, mock_hook, check_type, check_value, check_result):
+    mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+        {"col_name": ["col1"], "check_type": ["min"], "check_result": [1]}
+    )
+    mock_hook.return_value.insert_job.return_value = mock_job
+
+    op = BigQueryColumnCheckOperator(
+        task_id="check_column_fails",
+        table=TEST_TABLE_ID,
+        use_legacy_sql=False,
+        column_mapping={
+            "col1": {"min": {"equal_to": 0}},
+        },
+    )
+    with pytest.raises(AirflowException):
+        op.execute(create_context(op))