You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/19 18:21:04 UTC

[GitHub] [airflow] denimalpaca opened a new pull request, #25164: Common SQLCheckOperators Various Functionality Update

denimalpaca opened a new pull request, #25164:
URL: https://github.com/apache/airflow/pull/25164

   The SQLCheckOperators under the Common provider have some issues since their initial release, including:
   - Lack of batching
   - A bug in the `SQLTableCheckOperator` that forces users to use only fully-aggregated checks
   - A bug in the output of the test failure exception in the `SQLColumnCheckOperator` that only ever returns the final SQL query built
   
   This PR remedies these issues.
   
   closes: #25163 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r955121759


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"

Review Comment:
   Please. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #25164:
URL: https://github.com/apache/airflow/pull/25164


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r926859985


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -73,6 +79,7 @@ class SQLColumnCheckOperator(BaseSQLOperator):
             }
         }
 
+    :param batch: a SQL statement that is added to a WHERE clause to create batches

Review Comment:
   I'm not sure if this parameter name is self explained and even after reading the description I still dont understand how to to use it (I must check example/test/source code to understand)
   Can we make it self explained?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r955014928


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"
 
-        self.sql = f"SELECT {check_mins_sql} FROM (SELECT {checks_sql} FROM {self.table});"
-        records = hook.get_first(self.sql)
+        records = hook.get_pandas_df(self.sql)

Review Comment:
   So, this was changed because with `hook.get_first`, there was an issue with how the SQL was being written that caused *only* fully aggregated checks to be returned, unless the syntax of the SQL query was changed, but that would require either a `fetch_all` or `get_pandas_df` call as the new SQL needs to returned multiple lines. It seemed much easier and possibly more efficient to use pandas here, but if a `fetch_all` seems more reasonable this can be changed. Happy to explain more about the specific issue if curious.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r957336168


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"
 
-        self.sql = f"SELECT {check_mins_sql} FROM (SELECT {checks_sql} FROM {self.table});"
-        records = hook.get_first(self.sql)
+        records = hook.get_pandas_df(self.sql)

Review Comment:
   Ideally most users won't need to learn about why `.fetch_all` is being used instead of `.get_first` 🙃 . I'm working with some users of the operator right now and seeing what's complicated to make sure the docs are robust. I also have a working example DAG showing how to use the operator (with several more planned) [here](https://registry.astronomer.io/providers/common-sql/modules/sqltablecheckoperator/#example-dags).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r926867814


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -73,6 +79,7 @@ class SQLColumnCheckOperator(BaseSQLOperator):
             }
         }
 
+    :param batch: a SQL statement that is added to a WHERE clause to create batches

Review Comment:
   Agree. I also would not have guessed what it is about



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r950065201


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"

Review Comment:
   Whoops. Missing `+` here. This string is just a literal, not part of `self.sql` (and thus the entire `partition_clause_statement` isn't used!



##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"
 
-        self.sql = f"SELECT {check_mins_sql} FROM (SELECT {checks_sql} FROM {self.table});"
-        records = hook.get_first(self.sql)
+        records = hook.get_pandas_df(self.sql)

Review Comment:
   Why did we change from getting records to getting this as a pandas dataframe? This now places a _hard_ requirement on using pandas for this operator, where as previously pandas was almost entirely optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25164:
URL: https://github.com/apache/airflow/pull/25164#issuecomment-1191316769

   Some tests are failing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r957336168


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"
 
-        self.sql = f"SELECT {check_mins_sql} FROM (SELECT {checks_sql} FROM {self.table});"
-        records = hook.get_first(self.sql)
+        records = hook.get_pandas_df(self.sql)

Review Comment:
   Ideally most users won't need to learn about why `.fetch_all` is being used instead of `.get_first` 🙃 . I'm working with some users of the operator right now and seeing what's complicated to make sure the docs are robust. I also have a working example DAGs showing how to use the operator (with several more planned) [here](https://registry.astronomer.io/providers/common-sql/modules/sqltablecheckoperator/#example-dags).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r955009648


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"

Review Comment:
   Just got back from vacation, I see this got merged without a fix here. I can open a separate PR to address it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r926969512


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -73,6 +79,7 @@ class SQLColumnCheckOperator(BaseSQLOperator):
             }
         }
 
+    :param batch: a SQL statement that is added to a WHERE clause to create batches

Review Comment:
   Yep -- the idea here is to allow someone to not have to run a test on the full table, they could partition by date, or some other field. I clarified more in the docstring and added an e.g.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r956520309


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"
 
-        self.sql = f"SELECT {check_mins_sql} FROM (SELECT {checks_sql} FROM {self.table});"
-        records = hook.get_first(self.sql)
+        records = hook.get_pandas_df(self.sql)

Review Comment:
   Thanks.... Is the operator easy to understand by the users? I am afraid this is something we need good and clear howto and examples for it because people will have hard time using it and rais too many questions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r955132454


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"

Review Comment:
   Yeah. Ash (nick-name `Hawk-Eye`) noticed it after we merged and started voting on it. I decided to continue to release as both operators were new but I would love to get it fixed for 1.2.0.
   
   I hope you had good vacation @denimalpaca :D



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r926885517


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -73,6 +79,7 @@ class SQLColumnCheckOperator(BaseSQLOperator):
             }
         }
 
+    :param batch: a SQL statement that is added to a WHERE clause to create batches

Review Comment:
   Pushed a change to clarify, please let me know if it makes more sense now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r926935166


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -73,6 +79,7 @@ class SQLColumnCheckOperator(BaseSQLOperator):
             }
         }
 
+    :param batch: a SQL statement that is added to a WHERE clause to create batches

Review Comment:
   Hmm. I think a better name for that one would be `partition_clause`? Batch is rather associated with files but for SQL Tables, partitioning is likely more accurate name (Assuming that I correctly guessed what's the intended use).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r955128658


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"
 
-        self.sql = f"SELECT {check_mins_sql} FROM (SELECT {checks_sql} FROM {self.table});"
-        records = hook.get_first(self.sql)
+        records = hook.get_pandas_df(self.sql)

Review Comment:
   I'd be curious to hear more about it :D



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r926935166


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -73,6 +79,7 @@ class SQLColumnCheckOperator(BaseSQLOperator):
             }
         }
 
+    :param batch: a SQL statement that is added to a WHERE clause to create batches

Review Comment:
   Hmm. I think a better name for that one would be `partition_clause`? Batch is rather associated with files but for SQL Tables, partitioning is likely more accurate name (Assuming that I correctly guessed what's the meaning).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on PR #25164:
URL: https://github.com/apache/airflow/pull/25164#issuecomment-1191512691

   > Some tests are failing
   
   Looking into this today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] denimalpaca commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
denimalpaca commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r955273173


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"
 
-        self.sql = f"SELECT {check_mins_sql} FROM (SELECT {checks_sql} FROM {self.table});"
-        records = hook.get_first(self.sql)
+        records = hook.get_pandas_df(self.sql)

Review Comment:
   To expand a bit more, a check like `col_a + col_b >= col_c` would not work when there were multiple checks in the operator as the previous `SELECT` statement would then fail and require a `GROUP BY` clause iirc. So the check would either have to be in its own operator, or be amended like so: `SUM(col_a) + SUM(col_b) >= SUM(col_c)` which isn't quite the same check. So the query needed to be updated, and the one that I wound up using returns multiple rows. So `get_first` is no longer useful, and in the moment of writing it seemed that handling things with a pandas dataframe might be easier in the long term, if more complicated uses of the pulled in data were implemented. But as of now I see how it's unneeded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25164: Common SQLCheckOperators Various Functionality Update

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25164:
URL: https://github.com/apache/airflow/pull/25164#discussion_r955132454


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -273,38 +303,38 @@ def __init__(
 
         self.table = table
         self.checks = checks
+        self.partition_clause = partition_clause
         # OpenLineage needs a valid SQL query with the input/output table(s) to parse
         self.sql = f"SELECT * FROM {self.table};"
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-
-        check_names = [*self.checks]
-        check_mins_sql = ",".join(
-            self.sql_min_template.replace("check_name", check_name) for check_name in check_names
-        )
-        checks_sql = ",".join(
+        checks_sql = " UNION ALL ".join(
             [
-                self.sql_check_template.replace("check_statement", value["check_statement"]).replace(
-                    "check_name", check_name
-                )
+                self.sql_check_template.replace("check_statement", value["check_statement"])
+                .replace("_check_name", check_name)
+                .replace("table", self.table)
                 for check_name, value in self.checks.items()
             ]
         )
+        partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
+        self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
+        f"AS check_table {partition_clause_statement};"

Review Comment:
   Yeah. Asgh (nick-name `Hawk-Eye`) noticed it after we merged and started voting on it. I decided to continue to release as both operators were new but I would love to get it fixed for 1.2.0.
   
   I hope you had good vacation @denimalpaca :D



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org