You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jm...@apache.org on 2021/03/11 03:55:44 UTC

[airflow] branch master updated: Add job labels to bigquery check operators. (#14685)

This is an automated email from the ASF dual-hosted git repository.

jmcarp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 943baff  Add job labels to bigquery check operators. (#14685)
943baff is described below

commit 943baff6701f9f8591090bf76219571d7f5e2cc5
Author: Joshua Carp <jm...@gmail.com>
AuthorDate: Wed Mar 10 22:55:30 2021 -0500

    Add job labels to bigquery check operators. (#14685)
---
 airflow/providers/google/cloud/hooks/bigquery.py     |  5 +++++
 airflow/providers/google/cloud/operators/bigquery.py | 16 ++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py
index 948cf8e..ede1235 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -81,6 +81,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         bigquery_conn_id: Optional[str] = None,
         api_resource_configs: Optional[Dict] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        labels: Optional[Dict] = None,
     ) -> None:
         # To preserve backward compatibility
         # TODO: remove one day
@@ -101,6 +102,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         self.location = location
         self.running_job_id = None  # type: Optional[str]
         self.api_resource_configs = api_resource_configs if api_resource_configs else {}  # type Dict
+        self.labels = labels
 
     def get_conn(self) -> "BigQueryConnection":
         """Returns a BigQuery PEP 249 connection object."""
@@ -2060,6 +2062,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         if not self.project_id:
             raise ValueError("The project_id should be set")
 
+        labels = labels or self.labels
         schema_update_options = list(schema_update_options or [])
 
         if time_partitioning is None:
@@ -2258,6 +2261,7 @@ class BigQueryBaseCursor(LoggingMixin):
         api_resource_configs: Optional[Dict] = None,
         location: Optional[str] = None,
         num_retries: int = 5,
+        labels: Optional[Dict] = None,
     ) -> None:
 
         super().__init__()
@@ -2270,6 +2274,7 @@ class BigQueryBaseCursor(LoggingMixin):
         self.running_job_id = None  # type: Optional[str]
         self.location = location
         self.num_retries = num_retries
+        self.labels = labels
         self.hook = hook
 
     def create_empty_table(self, *args, **kwargs) -> None:
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index cb55906..c831278 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -95,6 +95,7 @@ class _BigQueryDbHookMixin:
             use_legacy_sql=self.use_legacy_sql,
             location=self.location,
             impersonation_chain=self.impersonation_chain,
+            labels=self.labels,
         )
 
 
@@ -152,12 +153,15 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator):
         Service Account Token Creator IAM role to the directly preceding identity, with first
         account from the list granting this role to the originating account (templated).
     :type impersonation_chain: Union[str, Sequence[str]]
+    :param labels: a dictionary containing labels for the table, passed to BigQuery
+    :type labels: dict
     """
 
     template_fields = (
         'sql',
         'gcp_conn_id',
         'impersonation_chain',
+        'labels',
     )
     template_ext = ('.sql',)
     ui_color = BigQueryUIColors.CHECK.value
@@ -172,6 +176,7 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator):
         use_legacy_sql: bool = True,
         location: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        labels: Optional[dict] = None,
         **kwargs,
     ) -> None:
         super().__init__(sql=sql, **kwargs)
@@ -184,6 +189,7 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator):
         self.use_legacy_sql = use_legacy_sql
         self.location = location
         self.impersonation_chain = impersonation_chain
+        self.labels = labels
 
 
 class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
@@ -216,6 +222,8 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
         Service Account Token Creator IAM role to the directly preceding identity, with first
         account from the list granting this role to the originating account (templated).
     :type impersonation_chain: Union[str, Sequence[str]]
+    :param labels: a dictionary containing labels for the table, passed to BigQuery
+    :type labels: dict
     """
 
     template_fields = (
@@ -223,6 +231,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
         'gcp_conn_id',
         'pass_value',
         'impersonation_chain',
+        'labels',
     )
     template_ext = ('.sql',)
     ui_color = BigQueryUIColors.CHECK.value
@@ -239,6 +248,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
         use_legacy_sql: bool = True,
         location: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        labels: Optional[dict] = None,
         **kwargs,
     ) -> None:
         super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, **kwargs)
@@ -251,6 +261,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
         self.gcp_conn_id = gcp_conn_id
         self.use_legacy_sql = use_legacy_sql
         self.impersonation_chain = impersonation_chain
+        self.labels = labels
 
 
 class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperator):
@@ -296,6 +307,8 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
         Service Account Token Creator IAM role to the directly preceding identity, with first
         account from the list granting this role to the originating account (templated).
     :type impersonation_chain: Union[str, Sequence[str]]
+    :param labels: a dictionary containing labels for the table, passed to BigQuery
+    :type labels: dict
     """
 
     template_fields = (
@@ -304,6 +317,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
         'sql1',
         'sql2',
         'impersonation_chain',
+        'labels',
     )
     ui_color = BigQueryUIColors.CHECK.value
 
@@ -320,6 +334,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
         use_legacy_sql: bool = True,
         location: Optional[str] = None,
         impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        labels: Optional[Dict] = None,
         **kwargs,
     ) -> None:
         super().__init__(
@@ -338,6 +353,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
         self.use_legacy_sql = use_legacy_sql
         self.location = location
         self.impersonation_chain = impersonation_chain
+        self.labels = labels
 
 
 class BigQueryGetDataOperator(BaseOperator):