You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/30 18:34:44 UTC

[airflow] branch main updated: Bigquery assets (#23165)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 511d0ee256 Bigquery assets (#23165)
511d0ee256 is described below

commit 511d0ee256b819690ccf0f6b30d12340b1dd7f0a
Author: Wojciech Januszek <wj...@sigma.ug.edu.pl>
AuthorDate: Sat Apr 30 20:34:39 2022 +0200

    Bigquery assets (#23165)
---
 airflow/providers/google/cloud/hooks/bigquery.py   |  19 ++--
 airflow/providers/google/cloud/links/bigquery.py   |  77 ++++++++++++++++
 .../providers/google/cloud/operators/bigquery.py   | 102 +++++++++++++++++++--
 .../google/cloud/transfers/bigquery_to_bigquery.py |  14 ++-
 .../google/cloud/transfers/bigquery_to_gcs.py      |  15 ++-
 .../google/cloud/transfers/bigquery_to_mssql.py    |  10 ++
 airflow/providers/google/provider.yaml             |   2 +
 .../providers/google/cloud/hooks/test_bigquery.py  |  10 --
 .../google/cloud/operators/test_bigquery.py        |  30 +++---
 .../cloud/transfers/test_bigquery_to_bigquery.py   |   2 +-
 .../google/cloud/transfers/test_bigquery_to_gcs.py |   2 +-
 .../cloud/transfers/test_bigquery_to_mssql.py      |   2 +-
 12 files changed, 240 insertions(+), 45 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py
index 339cd74dee..d4f54f56ce 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -408,7 +408,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         location: Optional[str] = None,
         dataset_reference: Optional[Dict[str, Any]] = None,
         exists_ok: bool = True,
-    ) -> None:
+    ) -> Dict[str, Any]:
         """
         Create a new empty dataset:
         https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/insert
@@ -452,8 +452,11 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
 
         dataset: Dataset = Dataset.from_api_repr(dataset_reference)
         self.log.info('Creating dataset: %s in project: %s ', dataset.dataset_id, dataset.project)
-        self.get_client(location=location).create_dataset(dataset=dataset, exists_ok=exists_ok)
+        dataset_object = self.get_client(location=location).create_dataset(
+            dataset=dataset, exists_ok=exists_ok
+        )
         self.log.info('Dataset created successfully.')
+        return dataset_object.to_api_repr()
 
     @GoogleBaseHook.fallback_to_default_project_id
     def get_dataset_tables(
@@ -533,7 +536,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         encryption_configuration: Optional[Dict] = None,
         location: Optional[str] = None,
         project_id: Optional[str] = None,
-    ) -> None:
+    ) -> Table:
         """
         Creates a new external table in the dataset with the data from Google
         Cloud Storage. See here:
@@ -659,10 +662,11 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
             table.encryption_configuration = EncryptionConfiguration.from_api_repr(encryption_configuration)
 
         self.log.info('Creating external table: %s', external_project_dataset_table)
-        self.create_empty_table(
+        table_object = self.create_empty_table(
             table_resource=table.to_api_repr(), project_id=project_id, location=location, exists_ok=True
         )
         self.log.info('External table created successfully: %s', external_project_dataset_table)
+        return table_object
 
     @GoogleBaseHook.fallback_to_default_project_id
     def update_table(
@@ -1287,7 +1291,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         dataset_id: str,
         table_id: str,
         project_id: Optional[str] = None,
-    ) -> None:
+    ) -> Dict[str, Any]:
         """
         Update fields within a schema for a given dataset and table. Note that
         some fields in schemas are immutable and trying to change them will cause
@@ -1361,13 +1365,14 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         if not include_policy_tags:
             _remove_policy_tags(new_schema)
 
-        self.update_table(
+        table = self.update_table(
             table_resource={"schema": {"fields": new_schema}},
             fields=["schema"],
             project_id=project_id,
             dataset_id=dataset_id,
             table_id=table_id,
         )
+        return table
 
     @GoogleBaseHook.fallback_to_default_project_id
     def poll_job_complete(
@@ -2244,7 +2249,7 @@ class BigQueryBaseCursor(LoggingMixin):
         )
         return self.hook.create_empty_table(*args, **kwargs)
 
-    def create_empty_dataset(self, *args, **kwargs) -> None:
+    def create_empty_dataset(self, *args, **kwargs) -> Dict[str, Any]:
         """
         This method is deprecated.
         Please use `airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.create_empty_dataset`
diff --git a/airflow/providers/google/cloud/links/bigquery.py b/airflow/providers/google/cloud/links/bigquery.py
new file mode 100644
index 0000000000..a80818e203
--- /dev/null
+++ b/airflow/providers/google/cloud/links/bigquery.py
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google BigQuery links."""
+from typing import TYPE_CHECKING
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+BIGQUERY_BASE_LINK = "https://console.cloud.google.com/bigquery"
+BIGQUERY_DATASET_LINK = (
+    BIGQUERY_BASE_LINK + "?referrer=search&project={project_id}&d={dataset_id}&p={project_id}&page=dataset"
+)
+BIGQUERY_TABLE_LINK = (
+    BIGQUERY_BASE_LINK
+    + "?referrer=search&project={project_id}&d={dataset_id}&p={project_id}&page=table&t={table_id}"
+)
+
+
+class BigQueryDatasetLink(BaseGoogleLink):
+    """Helper class for constructing BigQuery Dataset Link"""
+
+    name = "BigQuery Dataset"
+    key = "bigquery_dataset"
+    format_str = BIGQUERY_DATASET_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        dataset_id: str,
+        project_id: str,
+    ):
+        task_instance.xcom_push(
+            context,
+            key=BigQueryDatasetLink.key,
+            value={"dataset_id": dataset_id, "project_id": project_id},
+        )
+
+
+class BigQueryTableLink(BaseGoogleLink):
+    """Helper class for constructing BigQuery Table Link"""
+
+    name = "BigQuery Table"
+    key = "bigquery_table"
+    format_str = BIGQUERY_TABLE_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        dataset_id: str,
+        project_id: str,
+        table_id: str,
+    ):
+        task_instance.xcom_push(
+            context,
+            key=BigQueryTableLink.key,
+            value={"dataset_id": dataset_id, "project_id": project_id, "table_id": table_id},
+        )
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index 8eb5c67b86..ecd42a576b 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -38,6 +38,7 @@ from airflow.models.xcom import XCom
 from airflow.operators.sql import SQLCheckOperator, SQLIntervalCheckOperator, SQLValueCheckOperator
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob
 from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url
+from airflow.providers.google.cloud.links.bigquery import BigQueryDatasetLink, BigQueryTableLink
 
 if TYPE_CHECKING:
     from airflow.models.taskinstance import TaskInstanceKey
@@ -798,6 +799,7 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
     )
     template_fields_renderers = {"table_resource": "json", "materialized_view": "json"}
     ui_color = BigQueryUIColors.TABLE.value
+    operator_extra_links = (BigQueryTableLink(),)
 
     def __init__(
         self,
@@ -879,6 +881,13 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
                 table_resource=self.table_resource,
                 exists_ok=self.exists_ok,
             )
+            BigQueryTableLink.persist(
+                context=context,
+                task_instance=self,
+                dataset_id=table.to_api_repr()["tableReference"]["datasetId"],
+                project_id=table.to_api_repr()["tableReference"]["projectId"],
+                table_id=table.to_api_repr()["tableReference"]["tableId"],
+            )
             self.log.info(
                 'Table %s.%s.%s created successfully', table.project, table.dataset_id, table.table_id
             )
@@ -977,6 +986,7 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
     )
     template_fields_renderers = {"table_resource": "json"}
     ui_color = BigQueryUIColors.TABLE.value
+    operator_extra_links = (BigQueryTableLink(),)
 
     def __init__(
         self,
@@ -1094,9 +1104,16 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
             impersonation_chain=self.impersonation_chain,
         )
         if self.table_resource:
-            bq_hook.create_empty_table(
+            table = bq_hook.create_empty_table(
                 table_resource=self.table_resource,
             )
+            BigQueryTableLink.persist(
+                context=context,
+                task_instance=self,
+                dataset_id=table.to_api_repr()["tableReference"]["datasetId"],
+                project_id=table.to_api_repr()["tableReference"]["projectId"],
+                table_id=table.to_api_repr()["tableReference"]["tableId"],
+            )
             return
 
         if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP':
@@ -1111,7 +1128,7 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
 
         source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]
 
-        bq_hook.create_external_table(
+        table = bq_hook.create_external_table(
             external_project_dataset_table=self.destination_project_dataset_table,
             schema_fields=schema_fields,
             source_uris=source_uris,
@@ -1128,6 +1145,13 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
             labels=self.labels,
             encryption_configuration=self.encryption_configuration,
         )
+        BigQueryTableLink.persist(
+            context=context,
+            task_instance=self,
+            dataset_id=table.to_api_repr()["tableReference"]["datasetId"],
+            project_id=table.to_api_repr()["tableReference"]["projectId"],
+            table_id=table.to_api_repr()["tableReference"]["tableId"],
+        )
 
 
 class BigQueryDeleteDatasetOperator(BaseOperator):
@@ -1257,6 +1281,7 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
     )
     template_fields_renderers = {"dataset_reference": "json"}
     ui_color = BigQueryUIColors.DATASET.value
+    operator_extra_links = (BigQueryDatasetLink(),)
 
     def __init__(
         self,
@@ -1292,13 +1317,19 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
         )
 
         try:
-            bq_hook.create_empty_dataset(
+            dataset = bq_hook.create_empty_dataset(
                 project_id=self.project_id,
                 dataset_id=self.dataset_id,
                 dataset_reference=self.dataset_reference,
                 location=self.location,
                 exists_ok=self.exists_ok,
             )
+            BigQueryDatasetLink.persist(
+                context=context,
+                task_instance=self,
+                dataset_id=dataset["datasetReference"]["datasetId"],
+                project_id=dataset["datasetReference"]["projectId"],
+            )
         except Conflict:
             dataset_id = self.dataset_reference.get("datasetReference", {}).get("datasetId", self.dataset_id)
             self.log.info('Dataset %s already exists.', dataset_id)
@@ -1339,6 +1370,7 @@ class BigQueryGetDatasetOperator(BaseOperator):
         'impersonation_chain',
     )
     ui_color = BigQueryUIColors.DATASET.value
+    operator_extra_links = (BigQueryDatasetLink(),)
 
     def __init__(
         self,
@@ -1367,7 +1399,14 @@ class BigQueryGetDatasetOperator(BaseOperator):
         self.log.info('Start getting dataset: %s:%s', self.project_id, self.dataset_id)
 
         dataset = bq_hook.get_dataset(dataset_id=self.dataset_id, project_id=self.project_id)
-        return dataset.to_api_repr()
+        dataset = dataset.to_api_repr()
+        BigQueryDatasetLink.persist(
+            context=context,
+            task_instance=self,
+            dataset_id=dataset["datasetReference"]["datasetId"],
+            project_id=dataset["datasetReference"]["projectId"],
+        )
+        return dataset
 
 
 class BigQueryGetDatasetTablesOperator(BaseOperator):
@@ -1558,6 +1597,7 @@ class BigQueryUpdateTableOperator(BaseOperator):
     )
     template_fields_renderers = {"table_resource": "json"}
     ui_color = BigQueryUIColors.TABLE.value
+    operator_extra_links = (BigQueryTableLink(),)
 
     def __init__(
         self,
@@ -1589,7 +1629,7 @@ class BigQueryUpdateTableOperator(BaseOperator):
             impersonation_chain=self.impersonation_chain,
         )
 
-        return bq_hook.update_table(
+        table = bq_hook.update_table(
             table_resource=self.table_resource,
             fields=self.fields,
             dataset_id=self.dataset_id,
@@ -1597,6 +1637,16 @@ class BigQueryUpdateTableOperator(BaseOperator):
             project_id=self.project_id,
         )
 
+        BigQueryTableLink.persist(
+            context=context,
+            task_instance=self,
+            dataset_id=table["tableReference"]["datasetId"],
+            project_id=table["tableReference"]["projectId"],
+            table_id=table["tableReference"]["tableId"],
+        )
+
+        return table
+
 
 class BigQueryUpdateDatasetOperator(BaseOperator):
     """
@@ -1641,6 +1691,7 @@ class BigQueryUpdateDatasetOperator(BaseOperator):
     )
     template_fields_renderers = {"dataset_resource": "json"}
     ui_color = BigQueryUIColors.DATASET.value
+    operator_extra_links = (BigQueryDatasetLink(),)
 
     def __init__(
         self,
@@ -1677,7 +1728,15 @@ class BigQueryUpdateDatasetOperator(BaseOperator):
             dataset_id=self.dataset_id,
             fields=fields,
         )
-        return dataset.to_api_repr()
+
+        dataset = dataset.to_api_repr()
+        BigQueryDatasetLink.persist(
+            context=context,
+            task_instance=self,
+            dataset_id=dataset["datasetReference"]["datasetId"],
+            project_id=dataset["datasetReference"]["projectId"],
+        )
+        return dataset
 
 
 class BigQueryDeleteTableOperator(BaseOperator):
@@ -1782,6 +1841,7 @@ class BigQueryUpsertTableOperator(BaseOperator):
     )
     template_fields_renderers = {"table_resource": "json"}
     ui_color = BigQueryUIColors.TABLE.value
+    operator_extra_links = (BigQueryTableLink(),)
 
     def __init__(
         self,
@@ -1813,11 +1873,18 @@ class BigQueryUpsertTableOperator(BaseOperator):
             location=self.location,
             impersonation_chain=self.impersonation_chain,
         )
-        hook.run_table_upsert(
+        table = hook.run_table_upsert(
             dataset_id=self.dataset_id,
             table_resource=self.table_resource,
             project_id=self.project_id,
         )
+        BigQueryTableLink.persist(
+            context=context,
+            task_instance=self,
+            dataset_id=table["tableReference"]["datasetId"],
+            project_id=table["tableReference"]["projectId"],
+            table_id=table["tableReference"]["tableId"],
+        )
 
 
 class BigQueryUpdateTableSchemaOperator(BaseOperator):
@@ -1879,6 +1946,7 @@ class BigQueryUpdateTableSchemaOperator(BaseOperator):
     )
     template_fields_renderers = {"schema_fields_updates": "json"}
     ui_color = BigQueryUIColors.TABLE.value
+    operator_extra_links = (BigQueryTableLink(),)
 
     def __init__(
         self,
@@ -1910,7 +1978,7 @@ class BigQueryUpdateTableSchemaOperator(BaseOperator):
             impersonation_chain=self.impersonation_chain,
         )
 
-        return bq_hook.update_table_schema(
+        table = bq_hook.update_table_schema(
             schema_fields_updates=self.schema_fields_updates,
             include_policy_tags=self.include_policy_tags,
             dataset_id=self.dataset_id,
@@ -1918,6 +1986,15 @@ class BigQueryUpdateTableSchemaOperator(BaseOperator):
             project_id=self.project_id,
         )
 
+        BigQueryTableLink.persist(
+            context=context,
+            task_instance=self,
+            dataset_id=table["tableReference"]["datasetId"],
+            project_id=table["tableReference"]["projectId"],
+            table_id=table["tableReference"]["tableId"],
+        )
+        return table
+
 
 class BigQueryInsertJobOperator(BaseOperator):
     """
@@ -1983,6 +2060,7 @@ class BigQueryInsertJobOperator(BaseOperator):
     )
     template_fields_renderers = {"configuration": "json", "configuration.query.query": "sql"}
     ui_color = BigQueryUIColors.QUERY.value
+    operator_extra_links = (BigQueryTableLink(),)
 
     def __init__(
         self,
@@ -2088,6 +2166,14 @@ class BigQueryInsertJobOperator(BaseOperator):
                     f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
                 )
 
+        table = job.to_api_repr()["configuration"]["query"]["destinationTable"]
+        BigQueryTableLink.persist(
+            context=context,
+            task_instance=self,
+            dataset_id=table["datasetId"],
+            project_id=table["projectId"],
+            table_id=table["tableId"],
+        )
         self.job_id = job.job_id
         return job.job_id
 
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_bigquery.py b/airflow/providers/google/cloud/transfers/bigquery_to_bigquery.py
index b81d347746..87e6f3f586 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_bigquery.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_bigquery.py
@@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
+from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -74,6 +75,7 @@ class BigQueryToBigQueryOperator(BaseOperator):
     )
     template_ext: Sequence[str] = ('.sql',)
     ui_color = '#e6f0e4'
+    operator_extra_links = (BigQueryTableLink(),)
 
     def __init__(
         self,
@@ -118,7 +120,7 @@ class BigQueryToBigQueryOperator(BaseOperator):
 
         with warnings.catch_warnings():
             warnings.simplefilter("ignore", DeprecationWarning)
-            hook.run_copy(
+            job_id = hook.run_copy(
                 source_project_dataset_tables=self.source_project_dataset_tables,
                 destination_project_dataset_table=self.destination_project_dataset_table,
                 write_disposition=self.write_disposition,
@@ -126,3 +128,13 @@ class BigQueryToBigQueryOperator(BaseOperator):
                 labels=self.labels,
                 encryption_configuration=self.encryption_configuration,
             )
+
+            job = hook.get_job(job_id=job_id).to_api_repr()
+            conf = job["configuration"]["copy"]["destinationTable"]
+            BigQueryTableLink.persist(
+                context=context,
+                task_instance=self,
+                dataset_id=conf["datasetId"],
+                project_id=conf["projectId"],
+                table_id=conf["tableId"],
+            )
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
index a0d7b3cfff..09ac190e0f 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_gcs.py
@@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
+from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -70,6 +71,7 @@ class BigQueryToGCSOperator(BaseOperator):
     )
     template_ext: Sequence[str] = ()
     ui_color = '#e4e6f0'
+    operator_extra_links = (BigQueryTableLink(),)
 
     def __init__(
         self,
@@ -113,7 +115,7 @@ class BigQueryToGCSOperator(BaseOperator):
             location=self.location,
             impersonation_chain=self.impersonation_chain,
         )
-        hook.run_extract(
+        job_id = hook.run_extract(
             source_project_dataset_table=self.source_project_dataset_table,
             destination_cloud_storage_uris=self.destination_cloud_storage_uris,
             compression=self.compression,
@@ -122,3 +124,14 @@ class BigQueryToGCSOperator(BaseOperator):
             print_header=self.print_header,
             labels=self.labels,
         )
+
+        job = hook.get_job(job_id=job_id).to_api_repr()
+        conf = job["configuration"]["extract"]["sourceTable"]
+        dataset_id, project_id, table_id = conf["datasetId"], conf["projectId"], conf["tableId"]
+        BigQueryTableLink.persist(
+            context=context,
+            task_instance=self,
+            dataset_id=dataset_id,
+            project_id=project_id,
+            table_id=table_id,
+        )
diff --git a/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py b/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
index ca63ff0d99..d8a600eabe 100644
--- a/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
+++ b/airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
@@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, List, Optional, Sequence, Union
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
+from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
 from airflow.providers.google.cloud.utils.bigquery_get_data import bigquery_get_data
 from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
 
@@ -75,6 +76,7 @@ class BigQueryToMsSqlOperator(BaseOperator):
     """
 
     template_fields: Sequence[str] = ('source_project_dataset_table', 'mssql_table', 'impersonation_chain')
+    operator_extra_links = (BigQueryTableLink(),)
 
     def __init__(
         self,
@@ -118,6 +120,14 @@ class BigQueryToMsSqlOperator(BaseOperator):
             location=self.location,
             impersonation_chain=self.impersonation_chain,
         )
+        project_id, dataset_id, table_id = self.source_project_dataset_table.split('.')
+        BigQueryTableLink.persist(
+            context=context,
+            task_instance=self,
+            dataset_id=dataset_id,
+            project_id=project_id,
+            table_id=table_id,
+        )
         mssql_hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id, schema=self.database)
         for rows in bigquery_get_data(
             self.log,
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 35524343d2..4089cefd99 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -884,6 +884,8 @@ extra-links:
   - airflow.providers.google.cloud.operators.datafusion.DataFusionPipelinesLink
   - airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
   - airflow.providers.google.cloud.links.dataplex.DataplexTasksLink
+  - airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink
+  - airflow.providers.google.cloud.links.bigquery.BigQueryTableLink
   - airflow.providers.google.cloud.links.bigquery_dts.BigQueryDataTransferConfigLink
   - airflow.providers.google.cloud.links.dataproc.DataprocLink
   - airflow.providers.google.cloud.links.dataproc.DataprocListLink
diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py
index 5f02239318..9de8333eb4 100644
--- a/tests/providers/google/cloud/hooks/test_bigquery.py
+++ b/tests/providers/google/cloud/hooks/test_bigquery.py
@@ -918,16 +918,6 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
     def test_dbapi_get_uri(self):
         assert self.hook.get_uri().startswith('bigquery://')
 
-    def test_dbapi_get_sqlalchemy_engine_failed(self):
-        with pytest.raises(
-            AirflowException,
-            match="For now, we only support instantiating SQLAlchemy engine by"
-            " using ADC"
-            ", extra__google_cloud_platform__key_path"
-            "and extra__google_cloud_platform__keyfile_dict",
-        ):
-            self.hook.get_sqlalchemy_engine()
-
 
 class TestBigQueryTableSplitter(unittest.TestCase):
     def test_internal_need_default_project(self):
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py
index 20bb205bb0..b5e42cce2f 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -82,7 +82,7 @@ class TestBigQueryCreateEmptyTableOperator(unittest.TestCase):
             task_id=TASK_ID, dataset_id=TEST_DATASET, project_id=TEST_GCP_PROJECT_ID, table_id=TEST_TABLE_ID
         )
 
-        operator.execute(None)
+        operator.execute(context=MagicMock())
         mock_hook.return_value.create_empty_table.assert_called_once_with(
             dataset_id=TEST_DATASET,
             project_id=TEST_GCP_PROJECT_ID,
@@ -108,7 +108,7 @@ class TestBigQueryCreateEmptyTableOperator(unittest.TestCase):
             view=VIEW_DEFINITION,
         )
 
-        operator.execute(None)
+        operator.execute(context=MagicMock())
         mock_hook.return_value.create_empty_table.assert_called_once_with(
             dataset_id=TEST_DATASET,
             project_id=TEST_GCP_PROJECT_ID,
@@ -134,7 +134,7 @@ class TestBigQueryCreateEmptyTableOperator(unittest.TestCase):
             materialized_view=MATERIALIZED_VIEW_DEFINITION,
         )
 
-        operator.execute(None)
+        operator.execute(context=MagicMock())
         mock_hook.return_value.create_empty_table.assert_called_once_with(
             dataset_id=TEST_DATASET,
             project_id=TEST_GCP_PROJECT_ID,
@@ -170,7 +170,7 @@ class TestBigQueryCreateEmptyTableOperator(unittest.TestCase):
             cluster_fields=cluster_fields,
         )
 
-        operator.execute(None)
+        operator.execute(context=MagicMock())
         mock_hook.return_value.create_empty_table.assert_called_once_with(
             dataset_id=TEST_DATASET,
             project_id=TEST_GCP_PROJECT_ID,
@@ -200,7 +200,7 @@ class TestBigQueryCreateExternalTableOperator(unittest.TestCase):
             autodetect=True,
         )
 
-        operator.execute(None)
+        operator.execute(context=MagicMock())
         mock_hook.return_value.create_external_table.assert_called_once_with(
             external_project_dataset_table=f'{TEST_DATASET}.{TEST_TABLE_ID}',
             schema_fields=[],
@@ -246,7 +246,7 @@ class TestBigQueryCreateEmptyDatasetOperator(unittest.TestCase):
             location=TEST_DATASET_LOCATION,
         )
 
-        operator.execute(None)
+        operator.execute(context=MagicMock())
         mock_hook.return_value.create_empty_dataset.assert_called_once_with(
             dataset_id=TEST_DATASET,
             project_id=TEST_GCP_PROJECT_ID,
@@ -263,7 +263,7 @@ class TestBigQueryGetDatasetOperator(unittest.TestCase):
             task_id=TASK_ID, dataset_id=TEST_DATASET, project_id=TEST_GCP_PROJECT_ID
         )
 
-        operator.execute(None)
+        operator.execute(context=MagicMock())
         mock_hook.return_value.get_dataset.assert_called_once_with(
             dataset_id=TEST_DATASET, project_id=TEST_GCP_PROJECT_ID
         )
@@ -281,7 +281,7 @@ class TestBigQueryUpdateTableOperator(unittest.TestCase):
             project_id=TEST_GCP_PROJECT_ID,
         )
 
-        operator.execute(None)
+        operator.execute(context=MagicMock())
         mock_hook.return_value.update_table.assert_called_once_with(
             table_resource=table_resource,
             fields=None,
@@ -310,7 +310,7 @@ class TestBigQueryUpdateTableSchemaOperator(unittest.TestCase):
             table_id=TEST_TABLE_ID,
             project_id=TEST_GCP_PROJECT_ID,
         )
-        operator.execute(None)
+        operator.execute(context=MagicMock())
 
         mock_hook.return_value.update_table_schema.assert_called_once_with(
             schema_fields_updates=schema_field_updates,
@@ -349,7 +349,7 @@ class TestBigQueryUpdateDatasetOperator(unittest.TestCase):
             project_id=TEST_GCP_PROJECT_ID,
         )
 
-        operator.execute(None)
+        operator.execute(context=MagicMock())
         mock_hook.return_value.update_dataset.assert_called_once_with(
             dataset_resource=dataset_resource,
             dataset_id=TEST_DATASET,
@@ -779,7 +779,7 @@ class TestBigQueryUpsertTableOperator(unittest.TestCase):
             project_id=TEST_GCP_PROJECT_ID,
         )
 
-        operator.execute(None)
+        operator.execute(context=MagicMock())
         mock_hook.return_value.run_table_upsert.assert_called_once_with(
             dataset_id=TEST_DATASET, project_id=TEST_GCP_PROJECT_ID, table_resource=TEST_TABLE_RESOURCES
         )
@@ -809,7 +809,7 @@ class TestBigQueryInsertJobOperator:
             job_id=job_id,
             project_id=TEST_GCP_PROJECT_ID,
         )
-        result = op.execute({})
+        result = op.execute(context=MagicMock())
 
         mock_hook.return_value.insert_job.assert_called_once_with(
             configuration=configuration,
@@ -846,7 +846,7 @@ class TestBigQueryInsertJobOperator:
             project_id=TEST_GCP_PROJECT_ID,
             cancel_on_kill=False,
         )
-        op.execute({})
+        op.execute(context=MagicMock())
 
         op.on_kill()
         mock_hook.return_value.cancel_job.assert_not_called()
@@ -917,7 +917,7 @@ class TestBigQueryInsertJobOperator:
             project_id=TEST_GCP_PROJECT_ID,
             reattach_states={"PENDING"},
         )
-        result = op.execute({})
+        result = op.execute(context=MagicMock())
 
         mock_hook.return_value.get_job.assert_called_once_with(
             location=TEST_DATASET_LOCATION,
@@ -962,7 +962,7 @@ class TestBigQueryInsertJobOperator:
             project_id=TEST_GCP_PROJECT_ID,
             force_rerun=True,
         )
-        result = op.execute({})
+        result = op.execute(context=MagicMock())
 
         mock_hook.return_value.insert_job.assert_called_once_with(
             configuration=configuration,
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py b/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py
index a3995461ff..109cafc4a3 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py
@@ -46,7 +46,7 @@ class TestBigQueryToBigQueryOperator(unittest.TestCase):
             encryption_configuration=encryption_configuration,
         )
 
-        operator.execute(None)
+        operator.execute(context=mock.MagicMock())
         mock_hook.return_value.run_copy.assert_called_once_with(
             source_project_dataset_tables=source_project_dataset_tables,
             destination_project_dataset_table=destination_project_dataset_table,
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
index 4542172649..5ed9b66031 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py
@@ -49,7 +49,7 @@ class TestBigQueryToGCSOperator(unittest.TestCase):
             labels=labels,
         )
 
-        operator.execute(None)
+        operator.execute(context=mock.MagicMock())
 
         mock_hook.return_value.run_extract.assert_called_once_with(
             source_project_dataset_table=source_project_dataset_table,
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py b/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
index 5f88c0cedf..fdae853810 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_mssql.py
@@ -38,7 +38,7 @@ class TestBigQueryToMsSqlOperator(unittest.TestCase):
             replace=False,
         )
 
-        operator.execute(None)
+        operator.execute(context=mock.MagicMock())
         # fmt: off
         mock_hook.return_value.list_rows.assert_called_once_with(
             dataset_id=TEST_DATASET,