You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/28 19:04:55 UTC

[GitHub] [airflow] denimalpaca opened a new pull request, #26761: Common sql bugfixes and improvements

denimalpaca opened a new pull request, #26761:
URL: https://github.com/apache/airflow/pull/26761

   Large PR that will fix a few bug issues, rework check operators, and add some new functionality.
   List of changes:
   - Fix a test for the BigQueryTableCheckOperator
   - Remove unnecessary job_id generation in BigQuery check operators
   - Rework SQL building and move to `init()` from `execute()`
   - Rework SQLColumnCheckOperator to send a single query with a UNION ALL of checks for each column
   - Add a check-level partition statement to allow greater functionality in each check
   - Add an option for an AirflowFailException, which will short-circuit retries on failure 26712
   
   closes: #26712
   related: #ISSUE


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #26761:
URL: https://github.com/apache/airflow/pull/26761#issuecomment-1271761139

   @uranusjr @eladkal @TJaniF @turbaszek one of the new additions here is to partition at the check-level; this is being implemented right now by a new dict key `where`, which I personally don't like, and would appreciate if you all could help me brainstorm a better, more descriptive term. Will update docs after this name is confirmed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #26761:
URL: https://github.com/apache/airflow/pull/26761#issuecomment-1292438001

   @potiuk fixed at time of commenting :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26761:
URL: https://github.com/apache/airflow/pull/26761#issuecomment-1292700310

   ❤️ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
bhirsz commented on PR #26761:
URL: https://github.com/apache/airflow/pull/26761#issuecomment-1261441480

   I have left few small nits but I will do review tomorrow (on the phone currently). I can also run the tests, system tests included 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r982839083


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -60,6 +74,12 @@ def _get_failed_checks(checks, col=None):
     ]
 
 
+def _raise_exception(exception_string, retry_on_failure):
+    if retry_on_failure:
+        raise AirflowException(exception_string)

Review Comment:
   This method raises the same exception, with or without retry_on_failure being True



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #26761:
URL: https://github.com/apache/airflow/pull/26761#issuecomment-1286971823

   Hey, @eladkal @turbaszek , would really appreciate a review on this PR! Hoping to get it merged before the next provider release. Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r982839523


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -33,7 +33,21 @@
     from airflow.utils.context import Context
 
 
-def parse_boolean(val: str) -> str | bool:
+def _convert_to_float_if_possible(s):
+    """
+    A small helper function to convert a string to a numeric value

Review Comment:
   The code is simple and self explanatory, I think we can skip docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #26761:
URL: https://github.com/apache/airflow/pull/26761#issuecomment-1282376816

   @uranusjr can we merge this PR if there are no other bugs to fix or feature requests to add? Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r982953944


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -60,6 +74,12 @@ def _get_failed_checks(checks, col=None):
     ]
 
 
+def _raise_exception(exception_string, retry_on_failure):

Review Comment:
   ```suggestion
   def _raise_exception(exception_string: str, retry_on_failure: bool) -> NoReturn:
   ```
   
   NoReturn should be imported from typing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r982953375


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -33,7 +33,21 @@
     from airflow.utils.context import Context
 
 
-def parse_boolean(val: str) -> str | bool:
+def _convert_to_float_if_possible(s):
+    """
+    A small helper function to convert a string to a numeric value

Review Comment:
   Type hints would however be a good idea here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r993446566


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -229,49 +237,81 @@ def __init__(
         **kwargs,
     ):
         super().__init__(conn_id=conn_id, database=database, **kwargs)
-        for checks in column_mapping.values():
-            for check, check_values in checks.items():
-                self._column_mapping_validation(check, check_values)
 
         self.table = table
         self.column_mapping = column_mapping
         self.partition_clause = partition_clause
-        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
-        self.sql = f"SELECT * FROM {self.table};"
+
+        checks_sql_list = []
+        for column, checks in self.column_mapping.items():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+            checks_sql_list.append(self._generate_sql_query(column, checks))
+        checks_sql = "UNION ALL".join(checks_sql_list)
+
+        self.sql = f"SELECT col_name, check_type, check_result FROM ({checks_sql}) AS check_columns"
 
     def execute(self, context: Context):
-        hook = self.get_db_hook()
         failed_tests = []
-        for column in self.column_mapping:
-            checks = [*self.column_mapping[column]]
-            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
-            partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
-            self.sql = f"SELECT {checks_sql} FROM {self.table} {partition_clause_statement};"
-            records = hook.get_first(self.sql)
+        hook = self.get_db_hook()
+        records = hook.get_records(self.sql)
 
-            if not records:
-                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+        if not records:
+            _raise_exception(f"The following query returned zero rows: {self.sql}", self.retry_on_failure)
 
-            self.log.info("Record: %s", records)
+        self.log.info("Record: %s", records)
 
-            for idx, result in enumerate(records):
-                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+        for row in records:
+            column, check, result = row
+            tolerance = self.column_mapping[column][check].get("tolerance")
 
-                self.column_mapping[column][checks[idx]]["result"] = result
-                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
-                    self.column_mapping[column][checks[idx]], result, tolerance
-                )
+            self.column_mapping[column][check]["result"] = result
+            self.column_mapping[column][check]["success"] = self._get_match(
+                self.column_mapping[column][check], result, tolerance
+            )
 
-            failed_tests.extend(_get_failed_checks(self.column_mapping[column], column))
-        if failed_tests:
-            raise AirflowException(
-                f"Test failed.\nResults:\n{records!s}\n"
-                "The following tests have failed:"
-                f"\n{''.join(failed_tests)}"
+        for col, checks in self.column_mapping.items():
+            failed_tests.extend(
+                [
+                    f"Column: {col}\n\tCheck: {check},\n\tCheck Values: {check_values}\n"
+                    for check, check_values in checks.items()
+                    if not check_values["success"]
+                ]
             )

Review Comment:
   Didn't know you could do multiple loops in list comprehensions!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #26761:
URL: https://github.com/apache/airflow/pull/26761


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r993243032


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -229,49 +237,81 @@ def __init__(
         **kwargs,
     ):
         super().__init__(conn_id=conn_id, database=database, **kwargs)
-        for checks in column_mapping.values():
-            for check, check_values in checks.items():
-                self._column_mapping_validation(check, check_values)
 
         self.table = table
         self.column_mapping = column_mapping
         self.partition_clause = partition_clause
-        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
-        self.sql = f"SELECT * FROM {self.table};"
+
+        checks_sql_list = []
+        for column, checks in self.column_mapping.items():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+            checks_sql_list.append(self._generate_sql_query(column, checks))
+        checks_sql = "UNION ALL".join(checks_sql_list)
+
+        self.sql = f"SELECT col_name, check_type, check_result FROM ({checks_sql}) AS check_columns"
 
     def execute(self, context: Context):
-        hook = self.get_db_hook()
         failed_tests = []
-        for column in self.column_mapping:
-            checks = [*self.column_mapping[column]]
-            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
-            partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
-            self.sql = f"SELECT {checks_sql} FROM {self.table} {partition_clause_statement};"
-            records = hook.get_first(self.sql)
+        hook = self.get_db_hook()
+        records = hook.get_records(self.sql)
 
-            if not records:
-                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+        if not records:
+            _raise_exception(f"The following query returned zero rows: {self.sql}", self.retry_on_failure)
 
-            self.log.info("Record: %s", records)
+        self.log.info("Record: %s", records)
 
-            for idx, result in enumerate(records):
-                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+        for row in records:
+            column, check, result = row

Review Comment:
   ```suggestion
           for column, check, result in records:
   ```



##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -229,49 +237,81 @@ def __init__(
         **kwargs,
     ):
         super().__init__(conn_id=conn_id, database=database, **kwargs)
-        for checks in column_mapping.values():
-            for check, check_values in checks.items():
-                self._column_mapping_validation(check, check_values)
 
         self.table = table
         self.column_mapping = column_mapping
         self.partition_clause = partition_clause
-        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
-        self.sql = f"SELECT * FROM {self.table};"
+
+        checks_sql_list = []
+        for column, checks in self.column_mapping.items():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+            checks_sql_list.append(self._generate_sql_query(column, checks))
+        checks_sql = "UNION ALL".join(checks_sql_list)
+
+        self.sql = f"SELECT col_name, check_type, check_result FROM ({checks_sql}) AS check_columns"
 
     def execute(self, context: Context):
-        hook = self.get_db_hook()
         failed_tests = []
-        for column in self.column_mapping:
-            checks = [*self.column_mapping[column]]
-            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
-            partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
-            self.sql = f"SELECT {checks_sql} FROM {self.table} {partition_clause_statement};"
-            records = hook.get_first(self.sql)
+        hook = self.get_db_hook()
+        records = hook.get_records(self.sql)
 
-            if not records:
-                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+        if not records:
+            _raise_exception(f"The following query returned zero rows: {self.sql}", self.retry_on_failure)
 
-            self.log.info("Record: %s", records)
+        self.log.info("Record: %s", records)
 
-            for idx, result in enumerate(records):
-                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+        for row in records:
+            column, check, result = row
+            tolerance = self.column_mapping[column][check].get("tolerance")
 
-                self.column_mapping[column][checks[idx]]["result"] = result
-                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
-                    self.column_mapping[column][checks[idx]], result, tolerance
-                )
+            self.column_mapping[column][check]["result"] = result
+            self.column_mapping[column][check]["success"] = self._get_match(
+                self.column_mapping[column][check], result, tolerance
+            )
 
-            failed_tests.extend(_get_failed_checks(self.column_mapping[column], column))
-        if failed_tests:
-            raise AirflowException(
-                f"Test failed.\nResults:\n{records!s}\n"
-                "The following tests have failed:"
-                f"\n{''.join(failed_tests)}"
+        for col, checks in self.column_mapping.items():
+            failed_tests.extend(
+                [
+                    f"Column: {col}\n\tCheck: {check},\n\tCheck Values: {check_values}\n"
+                    for check, check_values in checks.items()
+                    if not check_values["success"]
+                ]
             )
+        if failed_tests:
+            exception_string = f"""
+                Test failed.\nResults:\n{records!s}\n
+                The following tests have failed:
+                \n{''.join(failed_tests)}"""
+            _raise_exception(exception_string, self.retry_on_failure)
 
         self.log.info("All tests have passed")
 
+    def _generate_sql_query(self, column, checks):
+        def _generate_partition_clause(check):
+            if self.partition_clause and "partition_clause" not in checks[check]:
+                return f"WHERE {self.partition_clause}"
+            elif not self.partition_clause and "partition_clause" in checks[check]:
+                return f"WHERE {checks[check]['partition_clause']}"
+            elif self.partition_clause and "partition_clause" in checks[check]:
+                return f"WHERE {self.partition_clause} AND {checks[check]['partition_clause']}"
+            else:
+                return ""
+
+        checks_sql = "UNION ALL".join(
+            [
+                self.sql_check_template.format(
+                    check_statement=self.column_checks[check].format(column=column),
+                    check=check,
+                    table=self.table,
+                    column=column,
+                    partition_clause=_generate_partition_clause(check),
+                )
+                for check in checks
+            ]
+        )

Review Comment:
   ```suggestion
           checks_sql = "UNION ALL".join(
               self.sql_check_template.format(
                   check_statement=self.column_checks[check].format(column=column),
                   check=check,
                   table=self.table,
                   column=column,
                   partition_clause=_generate_partition_clause(check),
               )
               for check in checks
           )
   ```
   
   `join` takes a generator expression so the extra `[]` is not necessary (and they slow things down!)



##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -229,49 +237,81 @@ def __init__(
         **kwargs,
     ):
         super().__init__(conn_id=conn_id, database=database, **kwargs)
-        for checks in column_mapping.values():
-            for check, check_values in checks.items():
-                self._column_mapping_validation(check, check_values)
 
         self.table = table
         self.column_mapping = column_mapping
         self.partition_clause = partition_clause
-        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
-        self.sql = f"SELECT * FROM {self.table};"
+
+        checks_sql_list = []
+        for column, checks in self.column_mapping.items():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+            checks_sql_list.append(self._generate_sql_query(column, checks))
+        checks_sql = "UNION ALL".join(checks_sql_list)
+
+        self.sql = f"SELECT col_name, check_type, check_result FROM ({checks_sql}) AS check_columns"
 
     def execute(self, context: Context):
-        hook = self.get_db_hook()
         failed_tests = []
-        for column in self.column_mapping:
-            checks = [*self.column_mapping[column]]
-            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
-            partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
-            self.sql = f"SELECT {checks_sql} FROM {self.table} {partition_clause_statement};"
-            records = hook.get_first(self.sql)
+        hook = self.get_db_hook()
+        records = hook.get_records(self.sql)
 
-            if not records:
-                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+        if not records:
+            _raise_exception(f"The following query returned zero rows: {self.sql}", self.retry_on_failure)
 
-            self.log.info("Record: %s", records)
+        self.log.info("Record: %s", records)
 
-            for idx, result in enumerate(records):
-                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+        for row in records:
+            column, check, result = row
+            tolerance = self.column_mapping[column][check].get("tolerance")
 
-                self.column_mapping[column][checks[idx]]["result"] = result
-                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
-                    self.column_mapping[column][checks[idx]], result, tolerance
-                )
+            self.column_mapping[column][check]["result"] = result
+            self.column_mapping[column][check]["success"] = self._get_match(
+                self.column_mapping[column][check], result, tolerance
+            )
 
-            failed_tests.extend(_get_failed_checks(self.column_mapping[column], column))
-        if failed_tests:
-            raise AirflowException(
-                f"Test failed.\nResults:\n{records!s}\n"
-                "The following tests have failed:"
-                f"\n{''.join(failed_tests)}"
+        for col, checks in self.column_mapping.items():
+            failed_tests.extend(
+                [
+                    f"Column: {col}\n\tCheck: {check},\n\tCheck Values: {check_values}\n"
+                    for check, check_values in checks.items()
+                    if not check_values["success"]
+                ]
             )

Review Comment:
   ```suggestion
           failed_tests = [
               f"Column: {col}\n\tCheck: {check},\n\tCheck Values: {check_values}\n"
               for col, checks in self.column_mapping.items()
               for check, check_values in checks.items()
               if not check_values["success"]
           ]
   ```
   
   And the `failed_tests = []` line above can be removed.



##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -418,46 +461,59 @@ def __init__(
         self.table = table
         self.checks = checks
         self.partition_clause = partition_clause
-        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
-        self.sql = f"SELECT * FROM {self.table};"
+        self.sql = f"SELECT check_name, check_result FROM ({self._generate_sql_query()}) AS check_table"
 
     def execute(self, context: Context):
         hook = self.get_db_hook()
-        checks_sql = " UNION ALL ".join(
-            [
-                self.sql_check_template.replace("check_statement", value["check_statement"])
-                .replace("_check_name", check_name)
-                .replace("table", self.table)
-                for check_name, value in self.checks.items()
-            ]
-        )
-        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
-        self.sql = f"""
-            SELECT check_name, check_result FROM ({checks_sql})
-            AS check_table {partition_clause_statement}
-        """
-
         records = hook.get_records(self.sql)
 
         if not records:
-            raise AirflowException(f"The following query returned zero rows: {self.sql}")
+            _raise_exception(f"The following query returned zero rows: {self.sql}", self.retry_on_failure)
 
         self.log.info("Record:\n%s", records)
 
         for row in records:
             check, result = row
-            self.checks[check]["success"] = parse_boolean(str(result))
+            self.checks[check]["success"] = _parse_boolean(str(result))
 
-        failed_tests = _get_failed_checks(self.checks)
+        failed_tests = [
+            f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+            for check, check_values in self.checks.items()
+            if not check_values["success"]
+        ]
         if failed_tests:
-            raise AirflowException(
-                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
-                "The following tests have failed:"
-                f"\n{', '.join(failed_tests)}"
-            )
+            exception_string = f"""
+                Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n
+                The following tests have failed:
+                \n{', '.join(failed_tests)}
+            """

Review Comment:
   ```suggestion
               exception_string = (
                   f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
                   f"The following tests have failed:\n{', '.join(failed_tests)}"
               )
   ```
   
   Otherwise the exception message would contain weird indents



##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -46,18 +53,10 @@ def parse_boolean(val: str) -> str | bool:
     raise ValueError(f"{val!r} is not a boolean-like string value")
 
 
-def _get_failed_checks(checks, col=None):
-    if col:
-        return [
-            f"Column: {col}\nCheck: {check},\nCheck Values: {check_values}\n"
-            for check, check_values in checks.items()
-            if not check_values["success"]
-        ]
-    return [
-        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
-        for check, check_values in checks.items()
-        if not check_values["success"]
-    ]
+def _raise_exception(exception_string: str, retry_on_failure: bool) -> NoReturn:
+    if retry_on_failure:
+        raise AirflowException(exception_string)
+    raise AirflowFailException(exception_string)

Review Comment:
   Since this function is always used with `self.retry_on_failure`, perhaps it should be a method instead and we can get rid of the second argument?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r994639740


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -241,12 +241,13 @@ def __init__(
         self.column_mapping = column_mapping
         self.partition_clause = partition_clause
 
-        checks_sql_list = []
-        for column, checks in self.column_mapping.items():
-            for check, check_values in checks.items():
-                self._column_mapping_validation(check, check_values)
-            checks_sql_list.append(self._generate_sql_query(column, checks))
-        checks_sql = "UNION ALL".join(checks_sql_list)
+        def _build_checks_sql():

Review Comment:
   This change may also need to be made in the BigQuery version of the operator!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #26761:
URL: https://github.com/apache/airflow/pull/26761#issuecomment-1290735004

   @bhirsz do you have any further comments or suggestions? I would love to get this PR merged ASAP so it can make it into the next provider release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bhirsz commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
bhirsz commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r982840431


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -33,7 +33,21 @@
     from airflow.utils.context import Context
 
 
-def parse_boolean(val: str) -> str | bool:
+def _convert_to_float_if_possible(s):
+    """
+    A small helper function to convert a string to a numeric value
+    if appropriate
+
+    :param s: the string to be converted
+    """
+    try:
+        ret = float(s)

Review Comment:
   Unnecessary variable, you can return directly (return float(s) or return s in except)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r983584944


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -60,6 +74,12 @@ def _get_failed_checks(checks, col=None):
     ]
 
 
+def _raise_exception(exception_string, retry_on_failure):
+    if retry_on_failure:
+        raise AirflowException(exception_string)

Review Comment:
   This method comes from this [issue](https://github.com/apache/airflow/discussions/26712#discussioncomment-3747662); the `AirflowFailException` should halt the task from retrying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26761:
URL: https://github.com/apache/airflow/pull/26761#issuecomment-1291320385

   Unfortunately string normalization conflicts - pls. rebase and fix em :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r994147098


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -229,49 +236,75 @@ def __init__(
         **kwargs,
     ):
         super().__init__(conn_id=conn_id, database=database, **kwargs)
-        for checks in column_mapping.values():
-            for check, check_values in checks.items():
-                self._column_mapping_validation(check, check_values)
 
         self.table = table
         self.column_mapping = column_mapping
         self.partition_clause = partition_clause
-        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
-        self.sql = f"SELECT * FROM {self.table};"
+
+        checks_sql_list = []
+        for column, checks in self.column_mapping.items():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+            checks_sql_list.append(self._generate_sql_query(column, checks))
+        checks_sql = "UNION ALL".join(checks_sql_list)

Review Comment:
   ```suggestion
           def _build_checks_sql():
               for column, checks in self.column_mapping.items():
                   for check, check_values in checks.items():
                       self._column_mapping_validation(check, check_values)
                   yield self._generate_sql_query(column, checks)
   
           checks_sql = "UNION ALL".join(_build_checks_sql())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #26761: Common sql bugfixes and improvements

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r994639740


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -241,12 +241,13 @@ def __init__(
         self.column_mapping = column_mapping
         self.partition_clause = partition_clause
 
-        checks_sql_list = []
-        for column, checks in self.column_mapping.items():
-            for check, check_values in checks.items():
-                self._column_mapping_validation(check, check_values)
-            checks_sql_list.append(self._generate_sql_query(column, checks))
-        checks_sql = "UNION ALL".join(checks_sql_list)
+        def _build_checks_sql():

Review Comment:
   This change may also need to be made in the BigQuery version of the operator!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org