You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jm...@apache.org on 2021/03/11 03:55:44 UTC
[airflow] branch master updated: Add job labels to bigquery check
operators. (#14685)
This is an automated email from the ASF dual-hosted git repository.
jmcarp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 943baff Add job labels to bigquery check operators. (#14685)
943baff is described below
commit 943baff6701f9f8591090bf76219571d7f5e2cc5
Author: Joshua Carp <jm...@gmail.com>
AuthorDate: Wed Mar 10 22:55:30 2021 -0500
Add job labels to bigquery check operators. (#14685)
---
airflow/providers/google/cloud/hooks/bigquery.py | 5 +++++
airflow/providers/google/cloud/operators/bigquery.py | 16 ++++++++++++++++
2 files changed, 21 insertions(+)
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py
index 948cf8e..ede1235 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -81,6 +81,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
bigquery_conn_id: Optional[str] = None,
api_resource_configs: Optional[Dict] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ labels: Optional[Dict] = None,
) -> None:
# To preserve backward compatibility
# TODO: remove one day
@@ -101,6 +102,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
self.location = location
self.running_job_id = None # type: Optional[str]
self.api_resource_configs = api_resource_configs if api_resource_configs else {} # type Dict
+ self.labels = labels
def get_conn(self) -> "BigQueryConnection":
"""Returns a BigQuery PEP 249 connection object."""
@@ -2060,6 +2062,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
if not self.project_id:
raise ValueError("The project_id should be set")
+ labels = labels or self.labels
schema_update_options = list(schema_update_options or [])
if time_partitioning is None:
@@ -2258,6 +2261,7 @@ class BigQueryBaseCursor(LoggingMixin):
api_resource_configs: Optional[Dict] = None,
location: Optional[str] = None,
num_retries: int = 5,
+ labels: Optional[Dict] = None,
) -> None:
super().__init__()
@@ -2270,6 +2274,7 @@ class BigQueryBaseCursor(LoggingMixin):
self.running_job_id = None # type: Optional[str]
self.location = location
self.num_retries = num_retries
+ self.labels = labels
self.hook = hook
def create_empty_table(self, *args, **kwargs) -> None:
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index cb55906..c831278 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -95,6 +95,7 @@ class _BigQueryDbHookMixin:
use_legacy_sql=self.use_legacy_sql,
location=self.location,
impersonation_chain=self.impersonation_chain,
+ labels=self.labels,
)
@@ -152,12 +153,15 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:type impersonation_chain: Union[str, Sequence[str]]
+ :param labels: a dictionary containing labels for the table, passed to BigQuery
+ :type labels: dict
"""
template_fields = (
'sql',
'gcp_conn_id',
'impersonation_chain',
+ 'labels',
)
template_ext = ('.sql',)
ui_color = BigQueryUIColors.CHECK.value
@@ -172,6 +176,7 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator):
use_legacy_sql: bool = True,
location: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ labels: Optional[dict] = None,
**kwargs,
) -> None:
super().__init__(sql=sql, **kwargs)
@@ -184,6 +189,7 @@ class BigQueryCheckOperator(_BigQueryDbHookMixin, SQLCheckOperator):
self.use_legacy_sql = use_legacy_sql
self.location = location
self.impersonation_chain = impersonation_chain
+ self.labels = labels
class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
@@ -216,6 +222,8 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:type impersonation_chain: Union[str, Sequence[str]]
+ :param labels: a dictionary containing labels for the table, passed to BigQuery
+ :type labels: dict
"""
template_fields = (
@@ -223,6 +231,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
'gcp_conn_id',
'pass_value',
'impersonation_chain',
+ 'labels',
)
template_ext = ('.sql',)
ui_color = BigQueryUIColors.CHECK.value
@@ -239,6 +248,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
use_legacy_sql: bool = True,
location: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ labels: Optional[dict] = None,
**kwargs,
) -> None:
super().__init__(sql=sql, pass_value=pass_value, tolerance=tolerance, **kwargs)
@@ -251,6 +261,7 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
self.gcp_conn_id = gcp_conn_id
self.use_legacy_sql = use_legacy_sql
self.impersonation_chain = impersonation_chain
+ self.labels = labels
class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperator):
@@ -296,6 +307,8 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:type impersonation_chain: Union[str, Sequence[str]]
+ :param labels: a dictionary containing labels for the table, passed to BigQuery
+ :type labels: dict
"""
template_fields = (
@@ -304,6 +317,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
'sql1',
'sql2',
'impersonation_chain',
+ 'labels',
)
ui_color = BigQueryUIColors.CHECK.value
@@ -320,6 +334,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
use_legacy_sql: bool = True,
location: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ labels: Optional[Dict] = None,
**kwargs,
) -> None:
super().__init__(
@@ -338,6 +353,7 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
self.use_legacy_sql = use_legacy_sql
self.location = location
self.impersonation_chain = impersonation_chain
+ self.labels = labels
class BigQueryGetDataOperator(BaseOperator):