You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2023/01/09 08:10:23 UTC
[airflow] branch main updated: Fix BigQueryColumnCheckOperator runtime error (#28796)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c67f4af667 Fix BigQueryColumnCheckOperator runtime error (#28796)
c67f4af667 is described below
commit c67f4af667948e654585e6df102663670804819e
Author: Victor Chiapaikeo <vc...@gmail.com>
AuthorDate: Mon Jan 9 03:10:13 2023 -0500
Fix BigQueryColumnCheckOperator runtime error (#28796)
---
.../providers/google/cloud/operators/bigquery.py | 2 +-
.../google/cloud/operators/test_bigquery.py | 63 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 5b2f0d5c34..eeb003eb69 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -611,7 +611,7 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, SQLColumnCheckOperator):
self.column_mapping[column][check], result, tolerance
)
- failed_tests(
+ failed_tests.extend(
f"Column: {col}\n\tCheck: {check},\n\tCheck Values: {check_values}\n"
for col, checks in self.column_mapping.items()
for check, check_values in checks.items()
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py
index 94a906e20f..d699d6cedb 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -21,6 +21,7 @@ import unittest
from unittest import mock
from unittest.mock import MagicMock
+import pandas as pd
import pytest
from google.cloud.bigquery import DEFAULT_RETRY
from google.cloud.exceptions import Conflict
@@ -31,6 +32,7 @@ from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCheckOperator,
+ BigQueryColumnCheckOperator,
BigQueryConsoleIndexableLink,
BigQueryConsoleLink,
BigQueryCreateEmptyDatasetOperator,
@@ -1676,3 +1678,64 @@ def test_bigquery_value_check_empty():
with pytest.raises(AirflowException) as missing_param:
BigQueryValueCheckOperator(deferrable=True, kwargs={})
assert (missing_param.value.args[0] == expected) or (missing_param.value.args[0] == expected1)
+
+
+@pytest.mark.parametrize(
+ "check_type, check_value, check_result",
+ [
+ ("equal_to", 0, 0),
+ ("greater_than", 0, 1),
+ ("less_than", 0, -1),
+ ("geq_to", 0, 1),
+ ("geq_to", 0, 0),
+ ("leq_to", 0, 0),
+ ("leq_to", 0, -1),
+ ],
+)
+@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+def test_bigquery_column_check_operator_succeeds(mock_job, mock_hook, check_type, check_value, check_result):
+ mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+ {"col_name": ["col1"], "check_type": ["min"], "check_result": [check_result]}
+ )
+ mock_hook.return_value.insert_job.return_value = mock_job
+
+ op = BigQueryColumnCheckOperator(
+ task_id="check_column_succeeds",
+ table=TEST_TABLE_ID,
+ use_legacy_sql=False,
+ column_mapping={
+ "col1": {"min": {check_type: check_value}},
+ },
+ )
+ op.execute(create_context(op))
+
+
+@pytest.mark.parametrize(
+ "check_type, check_value, check_result",
+ [
+ ("equal_to", 0, 1),
+ ("greater_than", 0, -1),
+ ("less_than", 0, 1),
+ ("geq_to", 0, -1),
+ ("leq_to", 0, 1),
+ ],
+)
+@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryJob")
+def test_bigquery_column_check_operator_fails(mock_job, mock_hook, check_type, check_value, check_result):
+ mock_job.result.return_value.to_dataframe.return_value = pd.DataFrame(
+ {"col_name": ["col1"], "check_type": ["min"], "check_result": [1]}
+ )
+ mock_hook.return_value.insert_job.return_value = mock_job
+
+ op = BigQueryColumnCheckOperator(
+ task_id="check_column_fails",
+ table=TEST_TABLE_ID,
+ use_legacy_sql=False,
+ column_mapping={
+ "col1": {"min": {"equal_to": 0}},
+ },
+ )
+ with pytest.raises(AirflowException):
+ op.execute(create_context(op))