You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/15 14:16:45 UTC

[GitHub] [airflow] denimalpaca opened a new pull request, #24476: Add Core SQL Provider

denimalpaca opened a new pull request, #24476:
URL: https://github.com/apache/airflow/pull/24476

   Adds operators, tests, and new and updated docs for a Core SQL
   Provider. This provider is made in favor of adding these operators
   to the existing SQL operators in core Airflow. The new provider
   will allow for quicker development cycles.
   
   closes: #23874
   related: #23915 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1161785592

   BTW: @denimalpaca -> https://github.com/apache/airflow/pull/24581 I've added better instructions in both - the file that might get conflicted if multiple people modify breeze commands at the same time as well in the error message printed by pre-commit.
   
   It will explicittly tel you to run `breeze regenerate-command-images` in such cases, so that I don't have to do it in PRs :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #24476:
URL: https://github.com/apache/airflow/pull/24476#discussion_r908408904


##########
airflow/providers/core/__init__.py:
##########
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   We can still change it - before we release the provider and I think we should only do it after we move DBApiHook there (as explained in #23971). 
   
   I think common might be a better choice indeed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1164973203

   The idea of `core.sql` name came from me and I tihnk it makes sense for the following reasons:
   
   * this is not a provider that is usable on it's own. you would not install just "core-sql" provider - you would simply get it as dependency to other providers that use "sql"
   
   * this provider - unlike the providers we have so far - does not provide "specific" integration but it provides common airflow functionality that should be released independently from the "core" airflow. I believe we need a way to distinguish such "core extension" functionality from "regular" providers.
   
   * The idea behind it is that if - for example - you want to add "sql lineage" features (which I belive was the main reason for even thinking about such "sql-extraction". you could add it to "core sql" provider and release it with new "snowflake", "oracle", "mysql" providers depending on the new version of such "core sql" provider.
   
   * as the result- you could use better (I am guessing column-based) airflow lineage integration immediately when such "core" provider is released, without waiting for a) airflow 2.4 or 2.5 b) waiting until all SQL-related-providers bump min-airflow-version to thsoe 2.4 or 2.5 versions (or keep backwards compatibility integration for quite a while - which might be painful). If we add such "column lineage" extension today to DBAPI, this is what it would take. 
   
   * It does not have to be "core" - maybe a better name would be good, but I believe we somehow need to distinguish such "functiona" providers from "actual integrations" 
   
   * it is a bit of experiment - indeed - we have not done anything like that before, but I think this is the only way if we want to add such multi-provider functionality faster and make it available also for users who use a bit (not too much) earlier versions of airlfow. But I am totally open to other ideas here :).
   
   This is reasoning behind the idea - happy to hear about any comments but just wanted to provide a context here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1157938313

   (rebase early, Rebase often is the mantra that everyone should follow). BTW. You cna do it easily with the drop-down ^^
   next to "Update banch" just above this comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1161788468

   BTW @denimalpaca I just merged #24581 - so actually you might need to rebase again (and this time you should get the right instuctions when you get conflict!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1161737529

   Ok. I do have pre-commit installed and it definitely was running before all my commits, which is why I was so confused about the errors, because other errors _did_ come up in pre-commit that I fixed before pushing anything.
   
   The only document I see failing anymore is one that I did not touch, but I'll run that again locally and see.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1161760294

   Besides - you might have to run the same as is done in `CI` - run pre-commit with `--all-files` . And if you carefully read what static checks tell you, you will see the exact precommit and command to run to re-run it
   
   It looks like in this case the problem is with images conflicting as command line "help" because someone (likely me) added another command so you have now conflicting images in yours and main version.
   
   You wlll need to run `breeze regenerate-command images` to fix them


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil merged pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
kaxil merged PR #24476:
URL: https://github.com/apache/airflow/pull/24476


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #24476:
URL: https://github.com/apache/airflow/pull/24476#discussion_r911191856


##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,315 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_to: value that results should be greater than or equal to
+    - leq_to: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_to": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")
+
+            for idx, result in enumerate(records):
+                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        match_boolean = True
+        if "geq_to" in check_values:
+            if tolerance is not None:
+                match_boolean = record >= check_values["geq_to"] * (1 - tolerance)
+            else:
+                match_boolean = record >= check_values["geq_to"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                match_boolean = record > check_values["greater_than"] * (1 - tolerance)
+            else:
+                match_boolean = record > check_values["greater_than"]
+        if "leq_to" in check_values:
+            if tolerance is not None:
+                match_boolean = record <= check_values["leq_to"] * (1 + tolerance) and match_boolean
+            else:
+                match_boolean = record <= check_values["leq_to"] and match_boolean
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                match_boolean = record < check_values["less_than"] * (1 + tolerance) and match_boolean
+            else:
+                match_boolean = record < check_values["less_than"] and match_boolean
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                match_boolean = (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                ) and match_boolean
+            else:
+                match_boolean = record == check_values["equal_to"] and match_boolean
+        return match_boolean
+
+    def _column_mapping_validation(self, check, check_values):
+        if check not in self.column_checks:
+            raise AirflowException(f"Invalid column check: {check}.")
+        if (
+            "greater_than" not in check_values
+            and "geq_to" not in check_values
+            and "less_than" not in check_values
+            and "leq_to" not in check_values
+            and "equal_to" not in check_values
+        ):
+            raise ValueError(
+                "Please provide one or more of: less_than, leq_to, "
+                "greater_than, geq_to, or equal_to in the check's dict."
+            )
+
+        if "greater_than" in check_values and "less_than" in check_values:
+            if check_values["greater_than"] >= check_values["less_than"]:
+                raise ValueError(
+                    "greater_than should be strictly less than "
+                    "less_than. Use geq_to or leq_to for "
+                    "overlapping equality."
+                )
+
+        if "greater_than" in check_values and "leq_to" in check_values:
+            if check_values["greater_than"] >= check_values["leq_to"]:
+                raise ValueError(
+                    "greater_than must be strictly less than leq_to. "
+                    "Use geq_to with leq_to for overlapping equality."
+                )
+
+        if "geq_to" in check_values and "less_than" in check_values:
+            if check_values["geq_to"] >= check_values["less_than"]:
+                raise ValueError(
+                    "geq_to should be strictly less than less_than. "
+                    "Use leq_to with geq_to for overlapping equality."
+                )
+
+        if "geq_to" in check_values and "leq_to" in check_values:
+            if check_values["geq_to"] > check_values["leq_to"]:
+                raise ValueError("geq_to should be less than or equal to leq_to.")
+
+        if "greater_than" in check_values and "geq_to" in check_values:
+            raise ValueError("Only supply one of greater_than or geq_to.")
+
+        if "less_than" in check_values and "leq_to" in check_values:
+            raise ValueError("Only supply one of less_than or leq_to.")
+
+        if (
+            "greater_than" in check_values
+            or "geq_to" in check_values
+            or "less_than" in check_values
+            or "leq_to" in check_values
+        ) and "equal_to" in check_values:
+            raise ValueError(
+                "equal_to cannot be passed with a greater or less than "
+                "function. To specify 'greater than or equal to' or "
+                "'less than or equal to', use geq_to or leq_to."
+            )
+
+
+class SQLTableCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the checks provided in the checks dictionary.
+    Checks should be written to return a boolean result.
+
+    :param table: the table to run checks on
+    :param checks: the dictionary of checks, e.g.:
+
+    .. code-block:: python
+
+        {
+            "row_count_check": {"check_statement": "COUNT(*) = 1000"},
+            "column_sum_check": {"check_statement": "col_a + col_b < col_c"},
+        }
+
+    :param conn_id: the connection ID used to connect to the database

Review Comment:
   Is that built if I run `breeze build-docs`? I can't seem to find a preview anywhere (and not sure that putting in a separate markdown file to preview is going to be an accurate reflection of this).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1157753124

   @kaxil @potiuk @mik-laj I'm not sure why these remaining checks are failing -- one seems to be for an Airflow doc I didn't touch, and the other is a breeze output.
   The breeze one is a bit nebulous: `Update output of breeze commands in BREEZE.rst.........................................Failed` seems like something breeze should be doing automatically.
   The other doc error seems to be around imports I didn't touch:
   `ModuleNotFoundError: No module named 'airflow.provider_info'`
   and a link error:
   `  apache-airflow-providers                                     /opt/airflow/docs/apache-airflow-providers/packages-ref.rst::Anonymous hyperlink mismatch: 1 references but 0 targets.`
   but no line number or other info, and in a file I didn't touch. Also, this command passes locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1164870413

   > I don't think the location `/providers/core` is appropriate because `core` doesn't really give much information. Maybe we should move all the SQL operators in `airflow.operator` into the providers package at `/providers/sql`? Though, I don't really know why the SQL operators are still in core airflow, so it's something to check
   
   There are some issues with having a `providers.sql` because there technically is already one, and one gets added in one of the pre-commit tests for provider packages to make sure there aren't duplicate names.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1158952904

   @potiuk rebased a few times now, still seeing these errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1161755338

   Always rebase as first check:
   
   ![image](https://user-images.githubusercontent.com/595491/174811915-4384dbbc-7360-43b6-bc88-ecf60604d859.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #24476:
URL: https://github.com/apache/airflow/pull/24476#discussion_r911459410


##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,315 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_to: value that results should be greater than or equal to
+    - leq_to: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_to": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")
+
+            for idx, result in enumerate(records):
+                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        match_boolean = True
+        if "geq_to" in check_values:
+            if tolerance is not None:
+                match_boolean = record >= check_values["geq_to"] * (1 - tolerance)
+            else:
+                match_boolean = record >= check_values["geq_to"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                match_boolean = record > check_values["greater_than"] * (1 - tolerance)
+            else:
+                match_boolean = record > check_values["greater_than"]
+        if "leq_to" in check_values:
+            if tolerance is not None:
+                match_boolean = record <= check_values["leq_to"] * (1 + tolerance) and match_boolean
+            else:
+                match_boolean = record <= check_values["leq_to"] and match_boolean
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                match_boolean = record < check_values["less_than"] * (1 + tolerance) and match_boolean
+            else:
+                match_boolean = record < check_values["less_than"] and match_boolean
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                match_boolean = (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                ) and match_boolean
+            else:
+                match_boolean = record == check_values["equal_to"] and match_boolean
+        return match_boolean
+
+    def _column_mapping_validation(self, check, check_values):
+        if check not in self.column_checks:
+            raise AirflowException(f"Invalid column check: {check}.")
+        if (
+            "greater_than" not in check_values
+            and "geq_to" not in check_values
+            and "less_than" not in check_values
+            and "leq_to" not in check_values
+            and "equal_to" not in check_values
+        ):
+            raise ValueError(
+                "Please provide one or more of: less_than, leq_to, "
+                "greater_than, geq_to, or equal_to in the check's dict."
+            )
+
+        if "greater_than" in check_values and "less_than" in check_values:
+            if check_values["greater_than"] >= check_values["less_than"]:
+                raise ValueError(
+                    "greater_than should be strictly less than "
+                    "less_than. Use geq_to or leq_to for "
+                    "overlapping equality."
+                )
+
+        if "greater_than" in check_values and "leq_to" in check_values:
+            if check_values["greater_than"] >= check_values["leq_to"]:
+                raise ValueError(
+                    "greater_than must be strictly less than leq_to. "
+                    "Use geq_to with leq_to for overlapping equality."
+                )
+
+        if "geq_to" in check_values and "less_than" in check_values:
+            if check_values["geq_to"] >= check_values["less_than"]:
+                raise ValueError(
+                    "geq_to should be strictly less than less_than. "
+                    "Use leq_to with geq_to for overlapping equality."
+                )
+
+        if "geq_to" in check_values and "leq_to" in check_values:
+            if check_values["geq_to"] > check_values["leq_to"]:
+                raise ValueError("geq_to should be less than or equal to leq_to.")
+
+        if "greater_than" in check_values and "geq_to" in check_values:
+            raise ValueError("Only supply one of greater_than or geq_to.")
+
+        if "less_than" in check_values and "leq_to" in check_values:
+            raise ValueError("Only supply one of less_than or leq_to.")
+
+        if (
+            "greater_than" in check_values
+            or "geq_to" in check_values
+            or "less_than" in check_values
+            or "leq_to" in check_values
+        ) and "equal_to" in check_values:
+            raise ValueError(
+                "equal_to cannot be passed with a greater or less than "
+                "function. To specify 'greater than or equal to' or "
+                "'less than or equal to', use geq_to or leq_to."
+            )
+
+
+class SQLTableCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the checks provided in the checks dictionary.
+    Checks should be written to return a boolean result.
+
+    :param table: the table to run checks on
+    :param checks: the dictionary of checks, e.g.:
+
+    .. code-block:: python
+
+        {
+            "row_count_check": {"check_statement": "COUNT(*) = 1000"},
+            "column_sum_check": {"check_statement": "col_a + col_b < col_c"},
+        }
+
+    :param conn_id: the connection ID used to connect to the database

Review Comment:
   Yes. when you run `breeze build-docs --package-filter apache-airlfow-provider-core-sql` and run `./docs/start-docs-server.sh` or something - you should see the docs (it's described in contributing docs I am quite sure of that)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1157936831

   Please rebase to latest main. You are 28 commits behind the main. So the changes might come from outdated code your PR is based on. This should always be the first thing you do when you see failures that you don't recognize.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #24476:
URL: https://github.com/apache/airflow/pull/24476#discussion_r906013654


##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_than: value that results should be greater than or equal to
+    - leq_than: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_than": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_than": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")
+
+            for idx, result in enumerate(records):
+                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        if "geq_than" in check_values:
+            if tolerance is not None:
+                return record >= check_values["geq_than"] * (1 - tolerance)
+            return record >= check_values["geq_than"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                return record > check_values["greater_than"] * (1 - tolerance)
+            return record > check_values["greater_than"]
+        if "leq_than" in check_values:
+            if tolerance is not None:
+                return record <= check_values["leq_than"] * (1 + tolerance)
+            return record <= check_values["leq_than"]
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                return record < check_values["less_than"] * (1 + tolerance)
+            return record < check_values["less_than"]
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                return (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                )
+        return record == check_values["equal_to"]
+
+    def _column_mapping_validation(self, check, check_values):
+        if check not in self.column_checks:
+            raise AirflowException(f"Invalid column check: {check}.")
+        if (
+            "greater_than" not in check_values
+            and "geq_than" not in check_values
+            and "less_than" not in check_values
+            and "leq_than" not in check_values
+            and "equal_to" not in check_values
+        ):

Review Comment:
   I think that's a good change; grammatically it does make more sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #24476:
URL: https://github.com/apache/airflow/pull/24476#discussion_r907767029


##########
airflow/providers/core/__init__.py:
##########
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   I am not a fan of "core.sql" too like @ephraimbuddy -- maybe "base.sql" is better 🤷 but I do know that there already is a BaseSQLOperator -- which we should think of moving to a provider too tbh.
   
   But naming is heard. tagging a few people if they have better suggestions for names: @dstandish @jedcunningham @uranusjr 



##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,315 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_to: value that results should be greater than or equal to
+    - leq_to: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_to": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")

Review Comment:
   ```suggestion
               self.log.info("Record: %s", records)
   ```



##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,315 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_to: value that results should be greater than or equal to
+    - leq_to: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_to": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")
+
+            for idx, result in enumerate(records):
+                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        match_boolean = True
+        if "geq_to" in check_values:
+            if tolerance is not None:
+                match_boolean = record >= check_values["geq_to"] * (1 - tolerance)
+            else:
+                match_boolean = record >= check_values["geq_to"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                match_boolean = record > check_values["greater_than"] * (1 - tolerance)
+            else:
+                match_boolean = record > check_values["greater_than"]
+        if "leq_to" in check_values:
+            if tolerance is not None:
+                match_boolean = record <= check_values["leq_to"] * (1 + tolerance) and match_boolean
+            else:
+                match_boolean = record <= check_values["leq_to"] and match_boolean
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                match_boolean = record < check_values["less_than"] * (1 + tolerance) and match_boolean
+            else:
+                match_boolean = record < check_values["less_than"] and match_boolean
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                match_boolean = (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                ) and match_boolean
+            else:
+                match_boolean = record == check_values["equal_to"] and match_boolean
+        return match_boolean
+
+    def _column_mapping_validation(self, check, check_values):
+        if check not in self.column_checks:
+            raise AirflowException(f"Invalid column check: {check}.")
+        if (
+            "greater_than" not in check_values
+            and "geq_to" not in check_values
+            and "less_than" not in check_values
+            and "leq_to" not in check_values
+            and "equal_to" not in check_values
+        ):
+            raise ValueError(
+                "Please provide one or more of: less_than, leq_to, "
+                "greater_than, geq_to, or equal_to in the check's dict."
+            )
+
+        if "greater_than" in check_values and "less_than" in check_values:
+            if check_values["greater_than"] >= check_values["less_than"]:
+                raise ValueError(
+                    "greater_than should be strictly less than "
+                    "less_than. Use geq_to or leq_to for "
+                    "overlapping equality."
+                )
+
+        if "greater_than" in check_values and "leq_to" in check_values:
+            if check_values["greater_than"] >= check_values["leq_to"]:
+                raise ValueError(
+                    "greater_than must be strictly less than leq_to. "
+                    "Use geq_to with leq_to for overlapping equality."
+                )
+
+        if "geq_to" in check_values and "less_than" in check_values:
+            if check_values["geq_to"] >= check_values["less_than"]:
+                raise ValueError(
+                    "geq_to should be strictly less than less_than. "
+                    "Use leq_to with geq_to for overlapping equality."
+                )
+
+        if "geq_to" in check_values and "leq_to" in check_values:
+            if check_values["geq_to"] > check_values["leq_to"]:
+                raise ValueError("geq_to should be less than or equal to leq_to.")
+
+        if "greater_than" in check_values and "geq_to" in check_values:
+            raise ValueError("Only supply one of greater_than or geq_to.")
+
+        if "less_than" in check_values and "leq_to" in check_values:
+            raise ValueError("Only supply one of less_than or leq_to.")
+
+        if (
+            "greater_than" in check_values
+            or "geq_to" in check_values
+            or "less_than" in check_values
+            or "leq_to" in check_values
+        ) and "equal_to" in check_values:
+            raise ValueError(
+                "equal_to cannot be passed with a greater or less than "
+                "function. To specify 'greater than or equal to' or "
+                "'less than or equal to', use geq_to or leq_to."
+            )
+
+
+class SQLTableCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the checks provided in the checks dictionary.
+    Checks should be written to return a boolean result.
+
+    :param table: the table to run checks on
+    :param checks: the dictionary of checks, e.g.:
+
+    .. code-block:: python
+
+        {
+            "row_count_check": {"check_statement": "COUNT(*) = 1000"},
+            "column_sum_check": {"check_statement": "col_a + col_b < col_c"},
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLTableCheckOperator`
+    """
+
+    sql_check_template = "CASE WHEN check_statement THEN 1 ELSE 0 END AS check_name"
+    sql_min_template = "MIN(check_name)"
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        checks: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+
+        self.table = table
+        self.checks = checks
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+
+        check_names = [*self.checks]
+        check_mins_sql = ",".join(
+            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
+        )
+        checks_sql = ",".join(
+            [
+                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
+                    "check_name", check_name
+                )
+                for check_name, value in self.checks.items()
+            ]
+        )
+
+        self.sql = f"SELECT {check_mins_sql} FROM (SELECT {checks_sql} FROM {self.table});"
+        records = hook.get_first(self.sql)
+
+        if not records:
+            raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+        self.log.info(f"Record: {records}")

Review Comment:
   ```suggestion
           self.log.info("Record: %s", records)
   ```



##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,315 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_to: value that results should be greater than or equal to
+    - leq_to: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_to": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")
+
+            for idx, result in enumerate(records):
+                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        match_boolean = True
+        if "geq_to" in check_values:
+            if tolerance is not None:
+                match_boolean = record >= check_values["geq_to"] * (1 - tolerance)
+            else:
+                match_boolean = record >= check_values["geq_to"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                match_boolean = record > check_values["greater_than"] * (1 - tolerance)
+            else:
+                match_boolean = record > check_values["greater_than"]
+        if "leq_to" in check_values:
+            if tolerance is not None:
+                match_boolean = record <= check_values["leq_to"] * (1 + tolerance) and match_boolean
+            else:
+                match_boolean = record <= check_values["leq_to"] and match_boolean
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                match_boolean = record < check_values["less_than"] * (1 + tolerance) and match_boolean
+            else:
+                match_boolean = record < check_values["less_than"] and match_boolean
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                match_boolean = (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                ) and match_boolean
+            else:
+                match_boolean = record == check_values["equal_to"] and match_boolean
+        return match_boolean
+
+    def _column_mapping_validation(self, check, check_values):
+        if check not in self.column_checks:
+            raise AirflowException(f"Invalid column check: {check}.")
+        if (
+            "greater_than" not in check_values
+            and "geq_to" not in check_values
+            and "less_than" not in check_values
+            and "leq_to" not in check_values
+            and "equal_to" not in check_values
+        ):
+            raise ValueError(
+                "Please provide one or more of: less_than, leq_to, "
+                "greater_than, geq_to, or equal_to in the check's dict."
+            )
+
+        if "greater_than" in check_values and "less_than" in check_values:
+            if check_values["greater_than"] >= check_values["less_than"]:
+                raise ValueError(
+                    "greater_than should be strictly less than "
+                    "less_than. Use geq_to or leq_to for "
+                    "overlapping equality."
+                )
+
+        if "greater_than" in check_values and "leq_to" in check_values:
+            if check_values["greater_than"] >= check_values["leq_to"]:
+                raise ValueError(
+                    "greater_than must be strictly less than leq_to. "
+                    "Use geq_to with leq_to for overlapping equality."
+                )
+
+        if "geq_to" in check_values and "less_than" in check_values:
+            if check_values["geq_to"] >= check_values["less_than"]:
+                raise ValueError(
+                    "geq_to should be strictly less than less_than. "
+                    "Use leq_to with geq_to for overlapping equality."
+                )
+
+        if "geq_to" in check_values and "leq_to" in check_values:
+            if check_values["geq_to"] > check_values["leq_to"]:
+                raise ValueError("geq_to should be less than or equal to leq_to.")
+
+        if "greater_than" in check_values and "geq_to" in check_values:
+            raise ValueError("Only supply one of greater_than or geq_to.")
+
+        if "less_than" in check_values and "leq_to" in check_values:
+            raise ValueError("Only supply one of less_than or leq_to.")
+
+        if (
+            "greater_than" in check_values
+            or "geq_to" in check_values
+            or "less_than" in check_values
+            or "leq_to" in check_values
+        ) and "equal_to" in check_values:
+            raise ValueError(
+                "equal_to cannot be passed with a greater or less than "
+                "function. To specify 'greater than or equal to' or "
+                "'less than or equal to', use geq_to or leq_to."
+            )
+
+
+class SQLTableCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the checks provided in the checks dictionary.
+    Checks should be written to return a boolean result.
+
+    :param table: the table to run checks on
+    :param checks: the dictionary of checks, e.g.:
+
+    .. code-block:: python
+
+        {
+            "row_count_check": {"check_statement": "COUNT(*) = 1000"},
+            "column_sum_check": {"check_statement": "col_a + col_b < col_c"},
+        }
+
+    :param conn_id: the connection ID used to connect to the database

Review Comment:
   I think this should be indented further, not sure though, can you post a screenshot of how this looks in docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1159180264

   Yes. you need to install pre-commit and run those pre-commits locally. Those are real problems. Also your docs have errors to be fixed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #24476:
URL: https://github.com/apache/airflow/pull/24476#issuecomment-1162396280

   @potiuk still not sure why tests are failing... now it's a `mypy` error that wasn't there before and a `Error: Process completed with exit code 247.` which seems to be a docker resource issue. The branch is rebased with latest updates at the time of this comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #24476:
URL: https://github.com/apache/airflow/pull/24476#discussion_r906112005


##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_than: value that results should be greater than or equal to
+    - leq_than: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_than": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_than": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")
+
+            for idx, result in enumerate(records):
+                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        if "geq_than" in check_values:
+            if tolerance is not None:
+                return record >= check_values["geq_than"] * (1 - tolerance)
+            return record >= check_values["geq_than"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                return record > check_values["greater_than"] * (1 - tolerance)
+            return record > check_values["greater_than"]
+        if "leq_than" in check_values:
+            if tolerance is not None:
+                return record <= check_values["leq_than"] * (1 + tolerance)
+            return record <= check_values["leq_than"]
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                return record < check_values["less_than"] * (1 + tolerance)
+            return record < check_values["less_than"]
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                return (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                )
+        return record == check_values["equal_to"]

Review Comment:
   So, this is actually causing a static check error as the function needs a non-nested return statement; as I put in my commit message to make this change, it doesn't actually matter where the return statement is logic-wise because there is already a check that ensures that `check_values` contains some correct value, so having a catch-all return value at the end with `equal_to` should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #24476:
URL: https://github.com/apache/airflow/pull/24476#discussion_r905811972


##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_than: value that results should be greater than or equal to
+    - leq_than: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_than": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_than": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")
+
+            for idx, result in enumerate(records):
+                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        if "geq_than" in check_values:
+            if tolerance is not None:
+                return record >= check_values["geq_than"] * (1 - tolerance)
+            return record >= check_values["geq_than"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                return record > check_values["greater_than"] * (1 - tolerance)
+            return record > check_values["greater_than"]
+        if "leq_than" in check_values:
+            if tolerance is not None:
+                return record <= check_values["leq_than"] * (1 + tolerance)
+            return record <= check_values["leq_than"]
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                return record < check_values["less_than"] * (1 + tolerance)
+            return record < check_values["less_than"]
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                return (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                )
+        return record == check_values["equal_to"]

Review Comment:
   ```suggestion
               return record == check_values["equal_to"]
   ```
   



##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_than: value that results should be greater than or equal to
+    - leq_than: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_than": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_than": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")
+
+            for idx, result in enumerate(records):
+                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        if "geq_than" in check_values:
+            if tolerance is not None:
+                return record >= check_values["geq_than"] * (1 - tolerance)
+            return record >= check_values["geq_than"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                return record > check_values["greater_than"] * (1 - tolerance)
+            return record > check_values["greater_than"]
+        if "leq_than" in check_values:
+            if tolerance is not None:
+                return record <= check_values["leq_than"] * (1 + tolerance)
+            return record <= check_values["leq_than"]
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                return record < check_values["less_than"] * (1 + tolerance)
+            return record < check_values["less_than"]
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                return (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                )
+        return record == check_values["equal_to"]
+
+    def _column_mapping_validation(self, check, check_values):
+        if check not in self.column_checks:
+            raise AirflowException(f"Invalid column check: {check}.")
+        if (
+            "greater_than" not in check_values
+            and "geq_than" not in check_values
+            and "less_than" not in check_values
+            and "leq_than" not in check_values
+            and "equal_to" not in check_values
+        ):

Review Comment:
   Should `geq_than` and `leq_than` be renamed to `geq_to` and `leq_to` respectively? I'm assuming that this `geq_than` means `greater_than_or_equal_to`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #24476:
URL: https://github.com/apache/airflow/pull/24476#discussion_r906159148


##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_than: value that results should be greater than or equal to
+    - leq_than: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_than": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_than": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")
+
+            for idx, result in enumerate(records):
+                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        if "geq_than" in check_values:
+            if tolerance is not None:
+                return record >= check_values["geq_than"] * (1 - tolerance)
+            return record >= check_values["geq_than"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                return record > check_values["greater_than"] * (1 - tolerance)
+            return record > check_values["greater_than"]
+        if "leq_than" in check_values:
+            if tolerance is not None:
+                return record <= check_values["leq_than"] * (1 + tolerance)
+            return record <= check_values["leq_than"]
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                return record < check_values["less_than"] * (1 + tolerance)
+            return record < check_values["less_than"]
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                return (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                )
+        return record == check_values["equal_to"]

Review Comment:
   This will likely cause error. You can instead return False  at the end but nest this return statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #24476:
URL: https://github.com/apache/airflow/pull/24476#discussion_r905811972


##########
airflow/providers/core/sql/operators/sql.py:
##########
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Any, Dict, Optional, Union
+
+from airflow.exceptions import AirflowException
+from airflow.operators.sql import BaseSQLOperator
+
+
+def parse_boolean(val: str) -> Union[str, bool]:
+    """Try to parse a string into boolean.
+
+    Raises ValueError if the input is not a valid true- or false-like string value.
+    """
+    val = val.lower()
+    if val in ('y', 'yes', 't', 'true', 'on', '1'):
+        return True
+    if val in ('n', 'no', 'f', 'false', 'off', '0'):
+        return False
+    raise ValueError(f"{val!r} is not a boolean-like string value")
+
+
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+    Each check can take one or more of the following options:
+    - equal_to: an exact value to equal, cannot be used with other comparison options
+    - greater_than: value that result should be strictly greater than
+    - less_than: value that results should be strictly less than
+    - geq_than: value that results should be greater than or equal to
+    - leq_than: value that results should be less than or equal to
+    - tolerance: the percentage that the result may be off from the expected value
+
+    :param table: the table to run checks on
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.
+
+    .. code-block:: python
+
+        {
+            "col_name": {
+                "null_check": {
+                    "equal_to": 0,
+                },
+                "min": {
+                    "greater_than": 5,
+                    "leq_than": 10,
+                    "tolerance": 0.2,
+                },
+                "max": {"less_than": 1000, "geq_than": 10, "tolerance": 0.01},
+            }
+        }
+
+    :param conn_id: the connection ID used to connect to the database
+    :param database: name of database which overwrite the defined one in connection
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:SQLColumnCheckOperator`
+    """
+
+    column_checks = {
+        "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        "unique_check": "COUNT(column) - COUNT(DISTINCT(column)) AS column_unique_check",
+        "min": "MIN(column) AS column_min",
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+
+            if not records:
+                raise AirflowException(f"The following query returned zero rows: {self.sql}")
+
+            self.log.info(f"Record: {records}")
+
+            for idx, result in enumerate(records):
+                tolerance = self.column_mapping[column][checks[idx]].get("tolerance")
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        if "geq_than" in check_values:
+            if tolerance is not None:
+                return record >= check_values["geq_than"] * (1 - tolerance)
+            return record >= check_values["geq_than"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                return record > check_values["greater_than"] * (1 - tolerance)
+            return record > check_values["greater_than"]
+        if "leq_than" in check_values:
+            if tolerance is not None:
+                return record <= check_values["leq_than"] * (1 + tolerance)
+            return record <= check_values["leq_than"]
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                return record < check_values["less_than"] * (1 + tolerance)
+            return record < check_values["less_than"]
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                return (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                )
+        return record == check_values["equal_to"]

Review Comment:
   ```suggestion
               return record == check_values["equal_to"]
           return False
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #24476: Add Core SQL Provider

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #24476:
URL: https://github.com/apache/airflow/pull/24476#discussion_r907820703


##########
airflow/providers/core/__init__.py:
##########
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   Maybe "common" would be better than "core"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org