You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/12 20:46:47 UTC

[GitHub] [airflow] potiuk commented on a diff in pull request #23915: Add new SQLCheckOperators

potiuk commented on code in PR #23915:
URL: https://github.com/apache/airflow/pull/23915#discussion_r895230631


##########
tests/operators/test_sql.py:
##########
@@ -385,6 +387,82 @@ def test_fail_min_sql_max_value(self, mock_get_db_hook):
             operator.execute()
 
 
+class TestColumnCheckOperator(unittest.TestCase):

Review Comment:
   We prefer pytest test type unless there are good reasons not to (for new test)



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"

Review Comment:
   Just to reiterate - those "common" SQL classes will not be released unti Airflow 2.4 and it means that the first operator tha will be able to use them as "mandatory" will be 12 months after 2.4 is released (that's our poicy for providers), so I just want to make you aware @denimalpaca that those classes won't be to useful for quite some time. 
   
   Maybe a better Idea will be to add a separate "base sql" provider which will not be in the "airflow/operators" but it will be a separate package that could be installed additionally to airflow. We have not done so before, but maybe there is a good reason to introduce such a "shared sql" provider that wil encapsulate all SQL-like base functionality that open lineage might base on? I think trying to implement it in Airflow core is a bad idea, if the goal is fast adoption by multiple providers. 
   
   Just saying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org