You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/27 11:03:41 UTC

[GitHub] [airflow] bhirsz commented on a diff in pull request #26368: Add BigQuery Column and Table Check Operators

bhirsz commented on code in PR #26368:
URL: https://github.com/apache/airflow/pull/26368#discussion_r981094773


##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -520,6 +524,241 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
         )
 
 
+class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, SQLColumnCheckOperator):
+    """
+    BigQueryColumnCheckOperator subclasses the SQLColumnCheckOperator
+    in order to provide a job id for OpenLineage to parse. See base class
+    docstring for usage.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryColumnCheckOperator`
+
+    :param table: the table name
+    :param column_mapping: a dictionary relating columns to their checks
+    :param partition_clause: a string SQL statement added to a WHERE clause
+        to partition data
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :param use_legacy_sql: Whether to use legacy SQL (true)
+        or standard SQL (false).
+    :param location: The geographic location of the job. See details at:
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :param labels: a dictionary containing labels for the table, passed to BigQuery
+    """
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: dict,
+        partition_clause: str | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        use_legacy_sql: bool = True,
+        location: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        labels: dict | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(
+            table=table, column_mapping=column_mapping, partition_clause=partition_clause, **kwargs
+        )
+        self.table = table
+        self.column_mapping = column_mapping
+        self.partition_clause = partition_clause
+        self.gcp_conn_id = gcp_conn_id
+        self.use_legacy_sql = use_legacy_sql
+        self.location = location
+        self.impersonation_chain = impersonation_chain
+        self.labels = labels
+        # OpenLineage needs a valid SQL query with the input/output table(s) to parse
+        self.sql = ""
+
+    def _submit_job(
+        self,
+        hook: BigQueryHook,
+        job_id: str,
+    ) -> BigQueryJob:
+        """Submit a new job and get the job id for polling the status using Trigger."""
+        configuration = {"query": {"query": self.sql}}
+
+        return hook.insert_job(
+            configuration=configuration,
+            project_id=hook.project_id,
+            location=self.location,
+            job_id=job_id,
+            nowait=False,
+        )
+
+    def execute(self, context=None):
+        """Perform checks on the given columns."""
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", column) for check in checks])

Review Comment:
   I know that "self.column_checks" is part of the parent class but I see from git blame that you're the author. So I would recommend replacing the following strings:
   ```
   "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS column_null_check"
   ```
   with a format string template:
   ```
   "SUM(CASE WHEN {column} IS NULL THEN 1 ELSE 0 END) AS {column}_null_check"
   ```
   it can be later used with check.format(column=column) instead of replacing the sub string. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org