You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/04 21:40:22 UTC

[airflow] branch main updated: Datacatalog assets & system tests migration (AIP-47) (#24600)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9227d56e68 Datacatalog assets & system tests migration (AIP-47) (#24600)
9227d56e68 is described below

commit 9227d56e681a40e9caf2eefae87f7046c0d0c9f4
Author: Wojciech Januszek <wj...@sigma.ug.edu.pl>
AuthorDate: Mon Jul 4 23:39:59 2022 +0200

    Datacatalog assets & system tests migration (AIP-47) (#24600)
---
 .../cloud/example_dags/example_datacatalog.py      | 452 ---------------------
 .../providers/google/cloud/links/datacatalog.py    | 112 +++++
 .../google/cloud/operators/datacatalog.py          | 165 +++++++-
 airflow/providers/google/provider.yaml             |   3 +
 .../operators/cloud/datacatalog.rst                | 138 +++----
 docs/spelling_wordlist.txt                         |   1 +
 .../google/cloud/operators/test_datacatalog.py     | 148 +++++--
 .../providers/google/datacatalog/__init__.py}      |  18 -
 .../datacatalog/example_datacatalog_entries.py     | 209 ++++++++++
 .../example_datacatalog_search_catalog.py          | 227 +++++++++++
 .../example_datacatalog_tag_templates.py           | 192 +++++++++
 .../google/datacatalog/example_datacatalog_tags.py | 239 +++++++++++
 12 files changed, 1312 insertions(+), 592 deletions(-)

diff --git a/airflow/providers/google/cloud/example_dags/example_datacatalog.py b/airflow/providers/google/cloud/example_dags/example_datacatalog.py
deleted file mode 100644
index 848cd5ef90..0000000000
--- a/airflow/providers/google/cloud/example_dags/example_datacatalog.py
+++ /dev/null
@@ -1,452 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG that interacts with Google Data Catalog service
-"""
-import os
-from datetime import datetime
-
-from google.cloud.datacatalog_v1beta1 import FieldType, TagField, TagTemplateField
-from google.protobuf.field_mask_pb2 import FieldMask
-
-from airflow import models
-from airflow.models.baseoperator import chain
-from airflow.operators.bash import BashOperator
-from airflow.providers.google.cloud.operators.datacatalog import (
-    CloudDataCatalogCreateEntryGroupOperator,
-    CloudDataCatalogCreateEntryOperator,
-    CloudDataCatalogCreateTagOperator,
-    CloudDataCatalogCreateTagTemplateFieldOperator,
-    CloudDataCatalogCreateTagTemplateOperator,
-    CloudDataCatalogDeleteEntryGroupOperator,
-    CloudDataCatalogDeleteEntryOperator,
-    CloudDataCatalogDeleteTagOperator,
-    CloudDataCatalogDeleteTagTemplateFieldOperator,
-    CloudDataCatalogDeleteTagTemplateOperator,
-    CloudDataCatalogGetEntryGroupOperator,
-    CloudDataCatalogGetEntryOperator,
-    CloudDataCatalogGetTagTemplateOperator,
-    CloudDataCatalogListTagsOperator,
-    CloudDataCatalogLookupEntryOperator,
-    CloudDataCatalogRenameTagTemplateFieldOperator,
-    CloudDataCatalogSearchCatalogOperator,
-    CloudDataCatalogUpdateEntryOperator,
-    CloudDataCatalogUpdateTagOperator,
-    CloudDataCatalogUpdateTagTemplateFieldOperator,
-    CloudDataCatalogUpdateTagTemplateOperator,
-)
-
-PROJECT_ID = os.getenv("GCP_PROJECT_ID")
-BUCKET_ID = os.getenv("GCP_TEST_DATA_BUCKET", "INVALID BUCKET NAME")
-LOCATION = "us-central1"
-ENTRY_GROUP_ID = "important_data_jan_2019"
-ENTRY_ID = "python_files"
-TEMPLATE_ID = "template_id"
-FIELD_NAME_1 = "first"
-FIELD_NAME_2 = "second"
-FIELD_NAME_3 = "first-rename"
-
-with models.DAG(
-    "example_gcp_datacatalog",
-    schedule_interval='@once',
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-) as dag:
-    # Create
-    # [START howto_operator_gcp_datacatalog_create_entry_group]
-    create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
-        task_id="create_entry_group",
-        location=LOCATION,
-        entry_group_id=ENTRY_GROUP_ID,
-        entry_group={"display_name": "analytics data - jan 2011"},
-    )
-    # [END howto_operator_gcp_datacatalog_create_entry_group]
-
-    # [START howto_operator_gcp_datacatalog_create_entry_group_result]
-    create_entry_group_result = BashOperator(
-        task_id="create_entry_group_result",
-        bash_command=f"echo {create_entry_group.output['entry_group_id']}",
-    )
-    # [END howto_operator_gcp_datacatalog_create_entry_group_result]
-
-    # [START howto_operator_gcp_datacatalog_create_entry_group_result2]
-    create_entry_group_result2 = BashOperator(
-        task_id="create_entry_group_result2",
-        bash_command=f"echo {create_entry_group.output}",
-    )
-    # [END howto_operator_gcp_datacatalog_create_entry_group_result2]
-
-    # [START howto_operator_gcp_datacatalog_create_entry_gcs]
-    create_entry_gcs = CloudDataCatalogCreateEntryOperator(
-        task_id="create_entry_gcs",
-        location=LOCATION,
-        entry_group=ENTRY_GROUP_ID,
-        entry_id=ENTRY_ID,
-        entry={
-            "display_name": "Wizard",
-            "type_": "FILESET",
-            "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_ID}/**"]},
-        },
-    )
-    # [END howto_operator_gcp_datacatalog_create_entry_gcs]
-
-    # [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
-    create_entry_gcs_result = BashOperator(
-        task_id="create_entry_gcs_result",
-        bash_command=f"echo {create_entry_gcs.output['entry_id']}",
-    )
-    # [END howto_operator_gcp_datacatalog_create_entry_gcs_result]
-
-    # [START howto_operator_gcp_datacatalog_create_entry_gcs_result2]
-    create_entry_gcs_result2 = BashOperator(
-        task_id="create_entry_gcs_result2",
-        bash_command=f"echo {create_entry_gcs.output}",
-    )
-    # [END howto_operator_gcp_datacatalog_create_entry_gcs_result2]
-
-    # [START howto_operator_gcp_datacatalog_create_tag]
-    create_tag = CloudDataCatalogCreateTagOperator(
-        task_id="create_tag",
-        location=LOCATION,
-        entry_group=ENTRY_GROUP_ID,
-        entry=ENTRY_ID,
-        template_id=TEMPLATE_ID,
-        tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
-    )
-    # [END howto_operator_gcp_datacatalog_create_tag]
-
-    # [START howto_operator_gcp_datacatalog_create_tag_result]
-    create_tag_result = BashOperator(
-        task_id="create_tag_result",
-        bash_command=f"echo {create_tag.output['tag_id']}",
-    )
-    # [END howto_operator_gcp_datacatalog_create_tag_result]
-
-    # [START howto_operator_gcp_datacatalog_create_tag_result2]
-    create_tag_result2 = BashOperator(task_id="create_tag_result2", bash_command=f"echo {create_tag.output}")
-    # [END howto_operator_gcp_datacatalog_create_tag_result2]
-
-    # [START howto_operator_gcp_datacatalog_create_tag_template]
-    create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
-        task_id="create_tag_template",
-        location=LOCATION,
-        tag_template_id=TEMPLATE_ID,
-        tag_template={
-            "display_name": "Awesome Tag Template",
-            "fields": {
-                FIELD_NAME_1: TagTemplateField(
-                    display_name="first-field", type_=dict(primitive_type="STRING")
-                )
-            },
-        },
-    )
-    # [END howto_operator_gcp_datacatalog_create_tag_template]
-
-    # [START howto_operator_gcp_datacatalog_create_tag_template_result]
-    create_tag_template_result = BashOperator(
-        task_id="create_tag_template_result",
-        bash_command=f"echo {create_tag_template.output['tag_template_id']}",
-    )
-    # [END howto_operator_gcp_datacatalog_create_tag_template_result]
-
-    # [START howto_operator_gcp_datacatalog_create_tag_template_result2]
-    create_tag_template_result2 = BashOperator(
-        task_id="create_tag_template_result2",
-        bash_command=f"echo {create_tag_template.output}",
-    )
-    # [END howto_operator_gcp_datacatalog_create_tag_template_result2]
-
-    # [START howto_operator_gcp_datacatalog_create_tag_template_field]
-    create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
-        task_id="create_tag_template_field",
-        location=LOCATION,
-        tag_template=TEMPLATE_ID,
-        tag_template_field_id=FIELD_NAME_2,
-        tag_template_field=TagTemplateField(
-            display_name="second-field", type_=FieldType(primitive_type="STRING")
-        ),
-    )
-    # [END howto_operator_gcp_datacatalog_create_tag_template_field]
-
-    # [START howto_operator_gcp_datacatalog_create_tag_template_field_result]
-    create_tag_template_field_result = BashOperator(
-        task_id="create_tag_template_field_result",
-        bash_command=f"echo {create_tag_template_field.output['tag_template_field_id']}",
-    )
-    # [END howto_operator_gcp_datacatalog_create_tag_template_field_result]
-
-    # [START howto_operator_gcp_datacatalog_create_tag_template_field_result2]
-    create_tag_template_field_result2 = BashOperator(
-        task_id="create_tag_template_field_result2",
-        bash_command=f"echo {create_tag_template_field.output}",
-    )
-    # [END howto_operator_gcp_datacatalog_create_tag_template_field_result2]
-
-    # Delete
-    # [START howto_operator_gcp_datacatalog_delete_entry]
-    delete_entry = CloudDataCatalogDeleteEntryOperator(
-        task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
-    )
-    # [END howto_operator_gcp_datacatalog_delete_entry]
-
-    # [START howto_operator_gcp_datacatalog_delete_entry_group]
-    delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
-        task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
-    )
-    # [END howto_operator_gcp_datacatalog_delete_entry_group]
-
-    # [START howto_operator_gcp_datacatalog_delete_tag]
-    delete_tag = CloudDataCatalogDeleteTagOperator(
-        task_id="delete_tag",
-        location=LOCATION,
-        entry_group=ENTRY_GROUP_ID,
-        entry=ENTRY_ID,
-        tag=create_tag.output["tag_id"],
-    )
-    # [END howto_operator_gcp_datacatalog_delete_tag]
-
-    # [START howto_operator_gcp_datacatalog_delete_tag_template_field]
-    delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
-        task_id="delete_tag_template_field",
-        location=LOCATION,
-        tag_template=TEMPLATE_ID,
-        field=FIELD_NAME_2,
-        force=True,
-    )
-    # [END howto_operator_gcp_datacatalog_delete_tag_template_field]
-
-    # [START howto_operator_gcp_datacatalog_delete_tag_template]
-    delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
-        task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
-    )
-    # [END howto_operator_gcp_datacatalog_delete_tag_template]
-
-    # Get
-    # [START howto_operator_gcp_datacatalog_get_entry_group]
-    get_entry_group = CloudDataCatalogGetEntryGroupOperator(
-        task_id="get_entry_group",
-        location=LOCATION,
-        entry_group=ENTRY_GROUP_ID,
-        read_mask=FieldMask(paths=["name", "display_name"]),
-    )
-    # [END howto_operator_gcp_datacatalog_get_entry_group]
-
-    # [START howto_operator_gcp_datacatalog_get_entry_group_result]
-    get_entry_group_result = BashOperator(
-        task_id="get_entry_group_result",
-        bash_command=f"echo {get_entry_group.output}",
-    )
-    # [END howto_operator_gcp_datacatalog_get_entry_group_result]
-
-    # [START howto_operator_gcp_datacatalog_get_entry]
-    get_entry = CloudDataCatalogGetEntryOperator(
-        task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
-    )
-    # [END howto_operator_gcp_datacatalog_get_entry]
-
-    # [START howto_operator_gcp_datacatalog_get_entry_result]
-    get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
-    # [END howto_operator_gcp_datacatalog_get_entry_result]
-
-    # [START howto_operator_gcp_datacatalog_get_tag_template]
-    get_tag_template = CloudDataCatalogGetTagTemplateOperator(
-        task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
-    )
-    # [END howto_operator_gcp_datacatalog_get_tag_template]
-
-    # [START howto_operator_gcp_datacatalog_get_tag_template_result]
-    get_tag_template_result = BashOperator(
-        task_id="get_tag_template_result",
-        bash_command=f"{get_tag_template.output}",
-    )
-    # [END howto_operator_gcp_datacatalog_get_tag_template_result]
-
-    # List
-    # [START howto_operator_gcp_datacatalog_list_tags]
-    list_tags = CloudDataCatalogListTagsOperator(
-        task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
-    )
-    # [END howto_operator_gcp_datacatalog_list_tags]
-
-    # [START howto_operator_gcp_datacatalog_list_tags_result]
-    list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")
-    # [END howto_operator_gcp_datacatalog_list_tags_result]
-
-    # Lookup
-    # [START howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
-    current_entry_template = (
-        "//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
-        "entryGroups/{entry_group}/entries/{entry}"
-    )
-    lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
-        task_id="lookup_entry",
-        linked_resource=current_entry_template.format(
-            project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
-        ),
-    )
-    # [END howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
-
-    # [START howto_operator_gcp_datacatalog_lookup_entry_result]
-    lookup_entry_result = BashOperator(
-        task_id="lookup_entry_result",
-        bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
-    )
-    # [END howto_operator_gcp_datacatalog_lookup_entry_result]
-
-    # Rename
-    # [START howto_operator_gcp_datacatalog_rename_tag_template_field]
-    rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
-        task_id="rename_tag_template_field",
-        location=LOCATION,
-        tag_template=TEMPLATE_ID,
-        field=FIELD_NAME_1,
-        new_tag_template_field_id=FIELD_NAME_3,
-    )
-    # [END howto_operator_gcp_datacatalog_rename_tag_template_field]
-
-    # Search
-    # [START howto_operator_gcp_datacatalog_search_catalog]
-    search_catalog = CloudDataCatalogSearchCatalogOperator(
-        task_id="search_catalog", scope={"include_project_ids": [PROJECT_ID]}, query=f"projectid:{PROJECT_ID}"
-    )
-    # [END howto_operator_gcp_datacatalog_search_catalog]
-
-    # [START howto_operator_gcp_datacatalog_search_catalog_result]
-    search_catalog_result = BashOperator(
-        task_id="search_catalog_result",
-        bash_command=f"echo {search_catalog.output}",
-    )
-    # [END howto_operator_gcp_datacatalog_search_catalog_result]
-
-    # Update
-    # [START howto_operator_gcp_datacatalog_update_entry]
-    update_entry = CloudDataCatalogUpdateEntryOperator(
-        task_id="update_entry",
-        entry={"display_name": "New Wizard"},
-        update_mask={"paths": ["display_name"]},
-        location=LOCATION,
-        entry_group=ENTRY_GROUP_ID,
-        entry_id=ENTRY_ID,
-    )
-    # [END howto_operator_gcp_datacatalog_update_entry]
-
-    # [START howto_operator_gcp_datacatalog_update_tag]
-    update_tag = CloudDataCatalogUpdateTagOperator(
-        task_id="update_tag",
-        tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
-        update_mask={"paths": ["fields"]},
-        location=LOCATION,
-        entry_group=ENTRY_GROUP_ID,
-        entry=ENTRY_ID,
-        tag_id=f"{create_tag.output['tag_id']}",
-    )
-    # [END howto_operator_gcp_datacatalog_update_tag]
-
-    # [START howto_operator_gcp_datacatalog_update_tag_template]
-    update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
-        task_id="update_tag_template",
-        tag_template={"display_name": "Awesome Tag Template"},
-        update_mask={"paths": ["display_name"]},
-        location=LOCATION,
-        tag_template_id=TEMPLATE_ID,
-    )
-    # [END howto_operator_gcp_datacatalog_update_tag_template]
-
-    # [START howto_operator_gcp_datacatalog_update_tag_template_field]
-    update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
-        task_id="update_tag_template_field",
-        tag_template_field={"display_name": "Updated template field"},
-        update_mask={"paths": ["display_name"]},
-        location=LOCATION,
-        tag_template=TEMPLATE_ID,
-        tag_template_field_id=FIELD_NAME_1,
-    )
-    # [END howto_operator_gcp_datacatalog_update_tag_template_field]
-
-    # Create
-    create_tasks = [
-        create_entry_group,
-        create_entry_gcs,
-        create_tag_template,
-        create_tag_template_field,
-        create_tag,
-    ]
-    chain(*create_tasks)
-
-    create_entry_group >> delete_entry_group
-    create_entry_group >> create_entry_group_result
-    create_entry_group >> create_entry_group_result2
-
-    create_entry_gcs >> delete_entry
-    create_entry_gcs >> create_entry_gcs_result
-    create_entry_gcs >> create_entry_gcs_result2
-
-    create_tag_template >> delete_tag_template_field
-    create_tag_template >> create_tag_template_result
-    create_tag_template >> create_tag_template_result2
-
-    create_tag_template_field >> delete_tag_template_field
-    create_tag_template_field >> create_tag_template_field_result
-    create_tag_template_field >> create_tag_template_field_result2
-
-    create_tag >> delete_tag
-    create_tag >> create_tag_result
-    create_tag >> create_tag_result2
-
-    # Delete
-    delete_tasks = [
-        delete_tag,
-        delete_tag_template_field,
-        delete_tag_template,
-        delete_entry,
-        delete_entry_group,
-    ]
-    chain(*delete_tasks)
-
-    # Get
-    create_tag_template >> get_tag_template >> delete_tag_template
-    get_tag_template >> get_tag_template_result
-
-    create_entry_gcs >> get_entry >> delete_entry
-    get_entry >> get_entry_result
-
-    create_entry_group >> get_entry_group >> delete_entry_group
-    get_entry_group >> get_entry_group_result
-
-    # List
-    create_tag >> list_tags >> delete_tag
-    list_tags >> list_tags_result
-
-    # Lookup
-    create_entry_gcs >> lookup_entry_linked_resource >> delete_entry
-    lookup_entry_linked_resource >> lookup_entry_result
-
-    # Rename
-    update_tag >> rename_tag_template_field
-    create_tag_template_field >> rename_tag_template_field >> delete_tag_template_field
-
-    # Search
-    chain(create_tasks, search_catalog, delete_tasks)
-    search_catalog >> search_catalog_result
-
-    # Update
-    create_entry_gcs >> update_entry >> delete_entry
-    create_tag >> update_tag >> delete_tag
-    create_tag_template >> update_tag_template >> delete_tag_template
-    create_tag_template_field >> update_tag_template_field >> rename_tag_template_field
diff --git a/airflow/providers/google/cloud/links/datacatalog.py b/airflow/providers/google/cloud/links/datacatalog.py
new file mode 100644
index 0000000000..6df0f36d82
--- /dev/null
+++ b/airflow/providers/google/cloud/links/datacatalog.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Data Catalog links."""
+from typing import TYPE_CHECKING, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+DATACATALOG_BASE_LINK = "https://console.cloud.google.com/datacatalog"
+ENTRY_GROUP_LINK = (
+    DATACATALOG_BASE_LINK
+    + "/groups/{entry_group_id};container={project_id};location={location_id}?project={project_id}"
+)
+ENTRY_LINK = (
+    DATACATALOG_BASE_LINK
+    + "/projects/{project_id}/locations/{location_id}/entryGroups/{entry_group_id}/entries/{entry_id}\
+    ?project={project_id}"
+)
+TAG_TEMPLATE_LINK = (
+    DATACATALOG_BASE_LINK
+    + "/projects/{project_id}/locations/{location_id}/tagTemplates/{tag_template_id}?project={project_id}"
+)
+
+
+class DataCatalogEntryGroupLink(BaseGoogleLink):
+    """Helper class for constructing Data Catalog Entry Group Link"""
+
+    name = "Data Catalog Entry Group"
+    key = "data_catalog_entry_group"
+    format_str = ENTRY_GROUP_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        entry_group_id: str,
+        location_id: str,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=DataCatalogEntryGroupLink.key,
+            value={"entry_group_id": entry_group_id, "location_id": location_id, "project_id": project_id},
+        )
+
+
+class DataCatalogEntryLink(BaseGoogleLink):
+    """Helper class for constructing Data Catalog Entry Link"""
+
+    name = "Data Catalog Entry"
+    key = "data_catalog_entry"
+    format_str = ENTRY_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        entry_id: str,
+        entry_group_id: str,
+        location_id: str,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=DataCatalogEntryLink.key,
+            value={
+                "entry_id": entry_id,
+                "entry_group_id": entry_group_id,
+                "location_id": location_id,
+                "project_id": project_id,
+            },
+        )
+
+
+class DataCatalogTagTemplateLink(BaseGoogleLink):
+    """Helper class for constructing Data Catalog Tag Template Link"""
+
+    name = "Data Catalog Tag Template"
+    key = "data_catalog_tag_template"
+    format_str = TAG_TEMPLATE_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        tag_template_id: str,
+        location_id: str,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=DataCatalogTagTemplateLink.key,
+            value={"tag_template_id": tag_template_id, "location_id": location_id, "project_id": project_id},
+        )
diff --git a/airflow/providers/google/cloud/operators/datacatalog.py b/airflow/providers/google/cloud/operators/datacatalog.py
index 145aeaa4c6..20bb980591 100644
--- a/airflow/providers/google/cloud/operators/datacatalog.py
+++ b/airflow/providers/google/cloud/operators/datacatalog.py
@@ -20,11 +20,12 @@ from typing import TYPE_CHECKING, Dict, Optional, Sequence, Tuple, Union
 from google.api_core.exceptions import AlreadyExists, NotFound
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.retry import Retry
-from google.cloud.datacatalog_v1beta1 import DataCatalogClient, SearchCatalogResult
-from google.cloud.datacatalog_v1beta1.types import (
+from google.cloud.datacatalog import (
+    DataCatalogClient,
     Entry,
     EntryGroup,
     SearchCatalogRequest,
+    SearchCatalogResult,
     Tag,
     TagTemplate,
     TagTemplateField,
@@ -33,6 +34,11 @@ from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.datacatalog import CloudDataCatalogHook
+from airflow.providers.google.cloud.links.datacatalog import (
+    DataCatalogEntryGroupLink,
+    DataCatalogEntryLink,
+    DataCatalogTagTemplateLink,
+)
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -87,6 +93,7 @@ class CloudDataCatalogCreateEntryOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogEntryLink(),)
 
     def __init__(
         self,
@@ -143,7 +150,15 @@ class CloudDataCatalogCreateEntryOperator(BaseOperator):
             )
         _, _, entry_id = result.name.rpartition("/")
         self.log.info("Current entry_id ID: %s", entry_id)
-        context["task_instance"].xcom_push(key="entry_id", value=entry_id)
+        self.xcom_push(context, key="entry_id", value=entry_id)
+        DataCatalogEntryLink.persist(
+            context=context,
+            task_instance=self,
+            entry_id=self.entry_id,
+            entry_group_id=self.entry_group,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return Entry.to_dict(result)
 
 
@@ -195,6 +210,7 @@ class CloudDataCatalogCreateEntryGroupOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogEntryGroupLink(),)
 
     def __init__(
         self,
@@ -248,7 +264,14 @@ class CloudDataCatalogCreateEntryGroupOperator(BaseOperator):
 
         _, _, entry_group_id = result.name.rpartition("/")
         self.log.info("Current entry group ID: %s", entry_group_id)
-        context["task_instance"].xcom_push(key="entry_group_id", value=entry_group_id)
+        self.xcom_push(context, key="entry_group_id", value=entry_group_id)
+        DataCatalogEntryGroupLink.persist(
+            context=context,
+            task_instance=self,
+            entry_group_id=self.entry_group_id,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return EntryGroup.to_dict(result)
 
 
@@ -301,6 +324,7 @@ class CloudDataCatalogCreateTagOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogEntryLink(),)
 
     def __init__(
         self,
@@ -375,7 +399,15 @@ class CloudDataCatalogCreateTagOperator(BaseOperator):
 
         _, _, tag_id = tag.name.rpartition("/")
         self.log.info("Current Tag ID: %s", tag_id)
-        context["task_instance"].xcom_push(key="tag_id", value=tag_id)
+        self.xcom_push(context, key="tag_id", value=tag_id)
+        DataCatalogEntryLink.persist(
+            context=context,
+            task_instance=self,
+            entry_id=self.entry,
+            entry_group_id=self.entry_group,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return Tag.to_dict(tag)
 
 
@@ -425,6 +457,7 @@ class CloudDataCatalogCreateTagTemplateOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogTagTemplateLink(),)
 
     def __init__(
         self,
@@ -477,7 +510,14 @@ class CloudDataCatalogCreateTagTemplateOperator(BaseOperator):
             )
         _, _, tag_template = result.name.rpartition("/")
         self.log.info("Current Tag ID: %s", tag_template)
-        context["task_instance"].xcom_push(key="tag_template_id", value=tag_template)
+        self.xcom_push(context, key="tag_template_id", value=tag_template)
+        DataCatalogTagTemplateLink.persist(
+            context=context,
+            task_instance=self,
+            tag_template_id=self.tag_template_id,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return TagTemplate.to_dict(result)
 
 
@@ -532,6 +572,7 @@ class CloudDataCatalogCreateTagTemplateFieldOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogTagTemplateLink(),)
 
     def __init__(
         self,
@@ -588,7 +629,14 @@ class CloudDataCatalogCreateTagTemplateFieldOperator(BaseOperator):
             result = tag_template.fields[self.tag_template_field_id]
 
         self.log.info("Current Tag ID: %s", self.tag_template_field_id)
-        context["task_instance"].xcom_push(key="tag_template_field_id", value=self.tag_template_field_id)
+        self.xcom_push(context, key="tag_template_field_id", value=self.tag_template_field_id)
+        DataCatalogTagTemplateLink.persist(
+            context=context,
+            task_instance=self,
+            tag_template_id=self.tag_template,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return TagTemplateField.to_dict(result)
 
 
@@ -1067,6 +1115,7 @@ class CloudDataCatalogGetEntryOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogEntryLink(),)
 
     def __init__(
         self,
@@ -1106,6 +1155,14 @@ class CloudDataCatalogGetEntryOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        DataCatalogEntryLink.persist(
+            context=context,
+            task_instance=self,
+            entry_id=self.entry,
+            entry_group_id=self.entry_group,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return Entry.to_dict(result)
 
 
@@ -1153,6 +1210,7 @@ class CloudDataCatalogGetEntryGroupOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogEntryGroupLink(),)
 
     def __init__(
         self,
@@ -1192,6 +1250,13 @@ class CloudDataCatalogGetEntryGroupOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        DataCatalogEntryGroupLink.persist(
+            context=context,
+            task_instance=self,
+            entry_group_id=self.entry_group,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return EntryGroup.to_dict(result)
 
 
@@ -1234,6 +1299,7 @@ class CloudDataCatalogGetTagTemplateOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogTagTemplateLink(),)
 
     def __init__(
         self,
@@ -1270,6 +1336,13 @@ class CloudDataCatalogGetTagTemplateOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        DataCatalogTagTemplateLink.persist(
+            context=context,
+            task_instance=self,
+            tag_template_id=self.tag_template,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return TagTemplate.to_dict(result)
 
 
@@ -1319,6 +1392,7 @@ class CloudDataCatalogListTagsOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogEntryLink(),)
 
     def __init__(
         self,
@@ -1361,6 +1435,14 @@ class CloudDataCatalogListTagsOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        DataCatalogEntryLink.persist(
+            context=context,
+            task_instance=self,
+            entry_id=self.entry,
+            entry_group_id=self.entry_group,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
         return [Tag.to_dict(item) for item in result]
 
 
@@ -1406,6 +1488,7 @@ class CloudDataCatalogLookupEntryOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogEntryLink(),)
 
     def __init__(
         self,
@@ -1441,6 +1524,16 @@ class CloudDataCatalogLookupEntryOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+
+        project_id, location_id, entry_group_id, entry_id = result.name.split('/')[1::2]
+        DataCatalogEntryLink.persist(
+            context=context,
+            task_instance=self,
+            entry_id=entry_id,
+            entry_group_id=entry_group_id,
+            location_id=location_id,
+            project_id=project_id,
+        )
         return Entry.to_dict(result)
 
 
@@ -1489,6 +1582,7 @@ class CloudDataCatalogRenameTagTemplateFieldOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogTagTemplateLink(),)
 
     def __init__(
         self,
@@ -1531,6 +1625,13 @@ class CloudDataCatalogRenameTagTemplateFieldOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        DataCatalogTagTemplateLink.persist(
+            context=context,
+            task_instance=self,
+            tag_template_id=self.tag_template,
+            location_id=self.location,
+            project_id=self.project_id or hook.project_id,
+        )
 
 
 class CloudDataCatalogSearchCatalogOperator(BaseOperator):
@@ -1695,6 +1796,7 @@ class CloudDataCatalogUpdateEntryOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogEntryLink(),)
 
     def __init__(
         self,
@@ -1729,7 +1831,7 @@ class CloudDataCatalogUpdateEntryOperator(BaseOperator):
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
-        hook.update_entry(
+        result = hook.update_entry(
             entry=self.entry,
             update_mask=self.update_mask,
             location=self.location,
@@ -1741,6 +1843,16 @@ class CloudDataCatalogUpdateEntryOperator(BaseOperator):
             metadata=self.metadata,
         )
 
+        location_id, entry_group_id, entry_id = result.name.split("/")[3::2]
+        DataCatalogEntryLink.persist(
+            context=context,
+            task_instance=self,
+            entry_id=self.entry_id or entry_id,
+            entry_group_id=self.entry_group or entry_group_id,
+            location_id=self.location or location_id,
+            project_id=self.project_id or hook.project_id,
+        )
+
 
 class CloudDataCatalogUpdateTagOperator(BaseOperator):
     """
@@ -1795,6 +1907,7 @@ class CloudDataCatalogUpdateTagOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogEntryLink(),)
 
     def __init__(
         self,
@@ -1831,7 +1944,7 @@ class CloudDataCatalogUpdateTagOperator(BaseOperator):
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
-        hook.update_tag(
+        result = hook.update_tag(
             tag=self.tag,
             update_mask=self.update_mask,
             location=self.location,
@@ -1844,6 +1957,16 @@ class CloudDataCatalogUpdateTagOperator(BaseOperator):
             metadata=self.metadata,
         )
 
+        location_id, entry_group_id, entry_id = result.name.split("/")[3:8:2]
+        DataCatalogEntryLink.persist(
+            context=context,
+            task_instance=self,
+            entry_id=self.entry or entry_id,
+            entry_group_id=self.entry_group or entry_group_id,
+            location_id=self.location or location_id,
+            project_id=self.project_id or hook.project_id,
+        )
+
 
 class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
     """
@@ -1900,6 +2023,7 @@ class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogTagTemplateLink(),)
 
     def __init__(
         self,
@@ -1932,7 +2056,7 @@ class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
-        hook.update_tag_template(
+        result = hook.update_tag_template(
             tag_template=self.tag_template,
             update_mask=self.update_mask,
             location=self.location,
@@ -1943,6 +2067,15 @@ class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
             metadata=self.metadata,
         )
 
+        location_id, tag_template_id = result.name.split("/")[3::2]
+        DataCatalogTagTemplateLink.persist(
+            context=context,
+            task_instance=self,
+            tag_template_id=self.tag_template_id or tag_template_id,
+            location_id=self.location or location_id,
+            project_id=self.project_id or hook.project_id,
+        )
+
 
 class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
     """
@@ -2005,6 +2138,7 @@ class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
         "gcp_conn_id",
         "impersonation_chain",
     )
+    operator_extra_links = (DataCatalogTagTemplateLink(),)
 
     def __init__(
         self,
@@ -2041,7 +2175,7 @@ class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
         hook = CloudDataCatalogHook(
             gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
         )
-        hook.update_tag_template_field(
+        result = hook.update_tag_template_field(
             tag_template_field=self.tag_template_field,
             update_mask=self.update_mask,
             tag_template_field_name=self.tag_template_field_name,
@@ -2053,3 +2187,12 @@ class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+
+        location_id, tag_template_id = result.name.split("/")[3:6:2]
+        DataCatalogTagTemplateLink.persist(
+            context=context,
+            task_instance=self,
+            tag_template_id=self.tag_template or tag_template_id,
+            location_id=self.location or location_id,
+            project_id=self.project_id or hook.project_id,
+        )
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 225333d543..a4962fb714 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -940,6 +940,9 @@ extra-links:
   - airflow.providers.google.cloud.links.bigquery_dts.BigQueryDataTransferConfigLink
   - airflow.providers.google.cloud.links.cloud_tasks.CloudTasksQueueLink
   - airflow.providers.google.cloud.links.cloud_tasks.CloudTasksLink
+  - airflow.providers.google.cloud.links.datacatalog.DataCatalogEntryGroupLink
+  - airflow.providers.google.cloud.links.datacatalog.DataCatalogEntryLink
+  - airflow.providers.google.cloud.links.datacatalog.DataCatalogTagTemplateLink
   - airflow.providers.google.cloud.links.dataproc.DataprocLink
   - airflow.providers.google.cloud.links.dataproc.DataprocListLink
   - airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDetailedLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/datacatalog.rst b/docs/apache-airflow-providers-google/operators/cloud/datacatalog.rst
index 01d409714a..2c51926f68 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/datacatalog.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/datacatalog.rst
@@ -59,7 +59,7 @@ operators.
 
 The ``CloudDataCatalogGetEntryOperator`` use Project ID, Entry Group ID, Entry ID to get the entry.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_get_entry]
@@ -71,7 +71,7 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_get_entry_result]
@@ -79,7 +79,7 @@ The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used b
 
 The ``CloudDataCatalogLookupEntryOperator`` use the resource name to get the entry.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
@@ -91,7 +91,7 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_lookup_entry_result]
@@ -105,7 +105,7 @@ Creating an entry
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateEntryOperator`
 operator create the entry.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_create_entry_gcs]
@@ -117,15 +117,9 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
-    :language: python
-    :dedent: 4
-    :start-after: [START howto_operator_gcp_datacatalog_create_entry_gcs_result2]
-    :end-before: [END howto_operator_gcp_datacatalog_create_entry_gcs_result2]
-
 The newly created entry ID can be read with the ``entry_id`` key.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
@@ -139,7 +133,7 @@ Updating an entry
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateEntryOperator`
 operator update the entry.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_update_entry]
@@ -157,7 +151,7 @@ Deleting a entry
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteEntryOperator`
 operator delete the entry.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_delete_entry]
@@ -186,7 +180,7 @@ Creating an entry group
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateEntryGroupOperator`
 operator create the entry group.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_create_entry_group]
@@ -198,19 +192,13 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
-    :language: python
-    :dedent: 4
-    :start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
-    :end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
-
 The newly created entry group ID can be read with the ``entry_group_id`` key.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
-    :end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
+    :start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result]
+    :end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result]
 
 .. _howto/operator:CloudDataCatalogGetEntryGroupOperator:
 
@@ -220,7 +208,7 @@ Getting an entry group
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogGetEntryGroupOperator`
 operator get the entry group.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_get_entry_group]
@@ -232,7 +220,7 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_get_entry_group_result]
@@ -246,7 +234,7 @@ Deleting an entry group
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteEntryGroupOperator`
 operator delete the entry group.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_delete_entry_group]
@@ -258,8 +246,8 @@ parameters which allows you to dynamically determine values.
 
 .. _howto/operator:CloudDataCatalogTagTemplateOperators:
 
-Managing a tag templates
-^^^^^^^^^^^^^^^^^^^^^^^^
+Managing tag templates
+^^^^^^^^^^^^^^^^^^^^^^
 
 Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.TagTemplate` for representing a tag templates.
 
@@ -269,13 +257,13 @@ Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.TagTemplate` fo
 
 .. _howto/operator:CloudDataCatalogCreateTagTemplateOperator:
 
-Creating a tag templates
-""""""""""""""""""""""""
+Creating a tag template
+"""""""""""""""""""""""
 
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagTemplateOperator`
 operator get the tag template.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_create_tag_template]
@@ -287,15 +275,9 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
-    :language: python
-    :dedent: 4
-    :start-after: [START howto_operator_gcp_datacatalog_create_tag_template_result2]
-    :end-before: [END howto_operator_gcp_datacatalog_create_tag_template_result2]
-
 The newly created tag template ID can be read with the ``tag_template_id`` key.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_create_tag_template_result]
@@ -309,7 +291,7 @@ Deleting a tag template
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagTemplateOperator`
 operator delete the tag template.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_delete_tag_template]
@@ -328,7 +310,7 @@ Getting a tag template
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogGetTagTemplateOperator`
 operator get the tag template.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_get_tag_template]
@@ -340,7 +322,7 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_get_tag_template_result]
@@ -354,7 +336,7 @@ Updating a tag template
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagTemplateOperator`
 operator update the tag template.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_update_tag_template]
@@ -366,8 +348,8 @@ parameters which allows you to dynamically determine values.
 
 .. _howto/operator:CloudDataCatalogTagOperators:
 
-Managing a tags
-^^^^^^^^^^^^^^^
+Managing tags
+^^^^^^^^^^^^^
 
 Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.Tag` for representing a tag.
 
@@ -383,7 +365,7 @@ Creating a tag on an entry
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagOperator`
 operator get the tag template.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_create_tag]
@@ -395,33 +377,27 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
-    :language: python
-    :dedent: 4
-    :start-after: [START howto_operator_gcp_datacatalog_create_tag_result2]
-    :end-before: [END howto_operator_gcp_datacatalog_create_tag_result2]
-
 The newly created tag ID can be read with the ``tag_id`` key.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
-    :end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
+    :start-after: [START howto_operator_gcp_datacatalog_create_tag_result]
+    :end-before: [END howto_operator_gcp_datacatalog_create_tag_result]
 
 .. _howto/operator:CloudDataCatalogUpdateTagOperator:
 
-Updating an tag
-"""""""""""""""
+Updating a tag
+""""""""""""""
 
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagOperator`
 operator update the tag template.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_gcp_datacatalog_update_tag_template]
-    :end-before: [END howto_operator_gcp_datacatalog_update_tag_template]
+    :start-after: [START howto_operator_gcp_datacatalog_update_tag]
+    :end-before: [END howto_operator_gcp_datacatalog_update_tag]
 
 You can use :ref:`Jinja templating <concepts:jinja-templating>` with
 :template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagOperator`
@@ -429,17 +405,17 @@ parameters which allows you to dynamically determine values.
 
 .. _howto/operator:CloudDataCatalogDeleteTagOperator:
 
-Deleting an tag
-"""""""""""""""
+Deleting a tag
+""""""""""""""
 
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagOperator`
 operator delete the tag template.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_gcp_datacatalog_delete_tag_template]
-    :end-before: [END howto_operator_gcp_datacatalog_delete_tag_template]
+    :start-after: [START howto_operator_gcp_datacatalog_delete_tag]
+    :end-before: [END howto_operator_gcp_datacatalog_delete_tag]
 
 You can use :ref:`Jinja templating <concepts:jinja-templating>` with
 :template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagOperator`
@@ -447,13 +423,13 @@ parameters which allows you to dynamically determine values.
 
 .. _howto/operator:CloudDataCatalogListTagsOperator:
 
-Listing an tags on an entry
-"""""""""""""""""""""""""""
+Listing tags on an entry
+""""""""""""""""""""""""
 
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogListTagsOperator`
 operator get list of the tags on the entry.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_list_tags]
@@ -465,7 +441,7 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_list_tags_result]
@@ -491,7 +467,7 @@ Creating a field
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagTemplateFieldOperator`
 operator get the tag template field.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_create_tag_template_field]
@@ -503,19 +479,13 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
-    :language: python
-    :dedent: 4
-    :start-after: [START howto_operator_gcp_datacatalog_create_tag_template_field_result2]
-    :end-before: [END howto_operator_gcp_datacatalog_create_tag_template_field_result2]
-
 The newly created field ID can be read with the ``tag_template_field_id`` key.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
-    :end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
+    :start-after: [START howto_operator_gcp_datacatalog_create_tag_template_field_result]
+    :end-before: [END howto_operator_gcp_datacatalog_create_tag_template_field_result]
 
 .. _howto/operator:CloudDataCatalogRenameTagTemplateFieldOperator:
 
@@ -525,7 +495,7 @@ Renaming a field
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogRenameTagTemplateFieldOperator`
 operator rename the tag template field.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_rename_tag_template_field]
@@ -543,7 +513,7 @@ Updating a field
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagTemplateFieldOperator`
 operator get the tag template field.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_update_tag_template_field]
@@ -562,7 +532,7 @@ Deleting a field
 The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagTemplateFieldOperator`
 operator delete the tag template field.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_delete_tag_template_field]
@@ -583,7 +553,7 @@ operator searches Data Catalog for multiple resources like entries, tags that ma
 
 The ``query`` parameters should defined using `search syntax <https://cloud.google.com/data-catalog/docs/how-to/search-reference>`__.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_search_catalog]
@@ -595,7 +565,7 @@ parameters which allows you to dynamically determine values.
 
 The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_gcp_datacatalog_search_catalog_result]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 44362b9b93..df08668eec 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1135,6 +1135,7 @@ opsgenie
 optimise
 optionality
 ora
+oracledb
 orchestrator
 orgtbl
 orm
diff --git a/tests/providers/google/cloud/operators/test_datacatalog.py b/tests/providers/google/cloud/operators/test_datacatalog.py
index ff6f14a10f..e35024f5a1 100644
--- a/tests/providers/google/cloud/operators/test_datacatalog.py
+++ b/tests/providers/google/cloud/operators/test_datacatalog.py
@@ -21,7 +21,7 @@ from unittest import TestCase, mock
 from google.api_core.exceptions import AlreadyExists
 from google.api_core.gapic_v1.method import _MethodDefault
 from google.api_core.retry import Retry
-from google.cloud.datacatalog_v1beta1.types import Entry, EntryGroup, Tag, TagTemplate, TagTemplateField
+from google.cloud.datacatalog import Entry, EntryGroup, Tag, TagTemplate, TagTemplateField
 from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow.providers.google.cloud.operators.datacatalog import (
@@ -47,8 +47,8 @@ from airflow.providers.google.cloud.operators.datacatalog import (
     CloudDataCatalogUpdateTagTemplateFieldOperator,
     CloudDataCatalogUpdateTagTemplateOperator,
 )
-from airflow.utils.context import Context
 
+BASE_PATH = "airflow.providers.google.cloud.operators.datacatalog.{}"
 TEST_PROJECT_ID: str = "example_id"
 TEST_LOCATION: str = "en-west-3"
 TEST_ENTRY_ID: str = "test-entry-id"
@@ -94,6 +94,8 @@ TEST_ENTRY_DICT: Dict = {
     'description': '',
     'display_name': '',
     'linked_resource': '',
+    'fully_qualified_name': '',
+    'labels': {},
     'name': TEST_ENTRY_PATH,
 }
 TEST_ENTRY_GROUP: EntryGroup = EntryGroup(name=TEST_ENTRY_GROUP_PATH)
@@ -101,14 +103,24 @@ TEST_ENTRY_GROUP_DICT: Dict = {'description': '', 'display_name': '', 'name': TE
 TEST_TAG: Tag = Tag(name=TEST_TAG_PATH)
 TEST_TAG_DICT: Dict = {'fields': {}, 'name': TEST_TAG_PATH, 'template': '', 'template_display_name': ''}
 TEST_TAG_TEMPLATE: TagTemplate = TagTemplate(name=TEST_TAG_TEMPLATE_PATH)
-TEST_TAG_TEMPLATE_DICT: Dict = {'display_name': '', 'fields': {}, 'name': TEST_TAG_TEMPLATE_PATH}
+TEST_TAG_TEMPLATE_DICT: Dict = {
+    'display_name': '',
+    'fields': {},
+    'is_publicly_readable': False,
+    'name': TEST_TAG_TEMPLATE_PATH,
+}
 TEST_TAG_TEMPLATE_FIELD: TagTemplateField = TagTemplateField(name=TEST_TAG_TEMPLATE_FIELD_ID)
 TEST_TAG_TEMPLATE_FIELD_DICT: Dict = {
+    'description': '',
     'display_name': '',
     'is_required': False,
     'name': TEST_TAG_TEMPLATE_FIELD_ID,
     'order': 0,
 }
+TEST_ENTRY_LINK = "projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}/entries/{entry_id}"
+TEST_TAG_TEMPLATE_LINK = "projects/{project_id}/locations/{location}/tagTemplates/{tag_template_id}"
+TEST_TAG_TEMPLATE_FIELD_LINK = "projects/{project_id}/locations/{location}/tagTemplates/{tag_template_id}\
+    /fields/{tag_template_field_id}"
 
 
 class TestCloudDataCatalogCreateEntryOperator(TestCase):
@@ -116,7 +128,8 @@ class TestCloudDataCatalogCreateEntryOperator(TestCase):
         "airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
         **{"return_value.create_entry.return_value": TEST_ENTRY},
     )
-    def test_assert_valid_hook_call(self, mock_hook) -> None:
+    @mock.patch(BASE_PATH.format("CloudDataCatalogCreateEntryOperator.xcom_push"))
+    def test_assert_valid_hook_call(self, mock_xcom, mock_hook) -> None:
         task = CloudDataCatalogCreateEntryOperator(
             task_id="task_id",
             location=TEST_LOCATION,
@@ -130,8 +143,8 @@ class TestCloudDataCatalogCreateEntryOperator(TestCase):
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
         )
-        ti = mock.MagicMock()
-        result = task.execute(context=Context(task_instance=ti))
+        context = mock.MagicMock()
+        result = task.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -146,11 +159,21 @@ class TestCloudDataCatalogCreateEntryOperator(TestCase):
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
         )
-        ti.xcom_push.assert_called_once_with(key="entry_id", value=TEST_ENTRY_ID)
+        mock_xcom.assert_called_with(
+            context,
+            key="data_catalog_entry",
+            value={
+                "entry_id": TEST_ENTRY_ID,
+                "entry_group_id": TEST_ENTRY_GROUP_ID,
+                "location_id": TEST_LOCATION,
+                "project_id": TEST_PROJECT_ID,
+            },
+        )
         assert TEST_ENTRY_DICT == result
 
     @mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
-    def test_assert_valid_hook_call_when_exists(self, mock_hook) -> None:
+    @mock.patch(BASE_PATH.format("CloudDataCatalogCreateEntryOperator.xcom_push"))
+    def test_assert_valid_hook_call_when_exists(self, mock_xcom, mock_hook) -> None:
         mock_hook.return_value.create_entry.side_effect = AlreadyExists(message="message")
         mock_hook.return_value.get_entry.return_value = TEST_ENTRY
         task = CloudDataCatalogCreateEntryOperator(
@@ -166,8 +189,8 @@ class TestCloudDataCatalogCreateEntryOperator(TestCase):
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
         )
-        ti = mock.MagicMock()
-        result = task.execute(context=Context(task_instance=ti))
+        context = mock.MagicMock()
+        result = task.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -191,7 +214,16 @@ class TestCloudDataCatalogCreateEntryOperator(TestCase):
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
         )
-        ti.xcom_push.assert_called_once_with(key="entry_id", value=TEST_ENTRY_ID)
+        mock_xcom.assert_called_with(
+            context,
+            key="data_catalog_entry",
+            value={
+                "entry_id": TEST_ENTRY_ID,
+                "entry_group_id": TEST_ENTRY_GROUP_ID,
+                "location_id": TEST_LOCATION,
+                "project_id": TEST_PROJECT_ID,
+            },
+        )
         assert TEST_ENTRY_DICT == result
 
 
@@ -200,7 +232,8 @@ class TestCloudDataCatalogCreateEntryGroupOperator(TestCase):
         "airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
         **{"return_value.create_entry_group.return_value": TEST_ENTRY_GROUP},
     )
-    def test_assert_valid_hook_call(self, mock_hook) -> None:
+    @mock.patch(BASE_PATH.format("CloudDataCatalogCreateEntryGroupOperator.xcom_push"))
+    def test_assert_valid_hook_call(self, mock_xcom, mock_hook) -> None:
         task = CloudDataCatalogCreateEntryGroupOperator(
             task_id="task_id",
             location=TEST_LOCATION,
@@ -213,8 +246,8 @@ class TestCloudDataCatalogCreateEntryGroupOperator(TestCase):
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
         )
-        ti = mock.MagicMock()
-        result = task.execute(context=Context(task_instance=ti))
+        context = mock.MagicMock()
+        result = task.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -228,7 +261,15 @@ class TestCloudDataCatalogCreateEntryGroupOperator(TestCase):
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
         )
-        ti.xcom_push.assert_called_once_with(key="entry_group_id", value=TEST_ENTRY_GROUP_ID)
+        mock_xcom.assert_called_with(
+            context,
+            key="data_catalog_entry_group",
+            value={
+                "entry_group_id": TEST_ENTRY_GROUP_ID,
+                "location_id": TEST_LOCATION,
+                "project_id": TEST_PROJECT_ID,
+            },
+        )
         assert result == TEST_ENTRY_GROUP_DICT
 
 
@@ -237,7 +278,8 @@ class TestCloudDataCatalogCreateTagOperator(TestCase):
         "airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
         **{"return_value.create_tag.return_value": TEST_TAG},
     )
-    def test_assert_valid_hook_call(self, mock_hook) -> None:
+    @mock.patch(BASE_PATH.format("CloudDataCatalogCreateTagOperator.xcom_push"))
+    def test_assert_valid_hook_call(self, mock_xcom, mock_hook) -> None:
         task = CloudDataCatalogCreateTagOperator(
             task_id="task_id",
             location=TEST_LOCATION,
@@ -252,8 +294,8 @@ class TestCloudDataCatalogCreateTagOperator(TestCase):
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
         )
-        ti = mock.MagicMock()
-        result = task.execute(context=Context(task_instance=ti))
+        context = mock.MagicMock()
+        result = task.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -269,7 +311,16 @@ class TestCloudDataCatalogCreateTagOperator(TestCase):
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
         )
-        ti.xcom_push.assert_called_once_with(key="tag_id", value=TEST_TAG_ID)
+        mock_xcom.assert_called_with(
+            context,
+            key="data_catalog_entry",
+            value={
+                "entry_id": TEST_ENTRY_ID,
+                "entry_group_id": TEST_ENTRY_GROUP_ID,
+                "location_id": TEST_LOCATION,
+                "project_id": TEST_PROJECT_ID,
+            },
+        )
         assert TEST_TAG_DICT == result
 
 
@@ -278,7 +329,8 @@ class TestCloudDataCatalogCreateTagTemplateOperator(TestCase):
         "airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
         **{"return_value.create_tag_template.return_value": TEST_TAG_TEMPLATE},
     )
-    def test_assert_valid_hook_call(self, mock_hook) -> None:
+    @mock.patch(BASE_PATH.format("CloudDataCatalogCreateTagTemplateOperator.xcom_push"))
+    def test_assert_valid_hook_call(self, mock_xcom, mock_hook) -> None:
         task = CloudDataCatalogCreateTagTemplateOperator(
             task_id="task_id",
             location=TEST_LOCATION,
@@ -291,8 +343,8 @@ class TestCloudDataCatalogCreateTagTemplateOperator(TestCase):
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
         )
-        ti = mock.MagicMock()
-        result = task.execute(context=Context(task_instance=ti))
+        context = mock.MagicMock()
+        result = task.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -306,7 +358,15 @@ class TestCloudDataCatalogCreateTagTemplateOperator(TestCase):
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
         )
-        ti.xcom_push.assert_called_once_with(key="tag_template_id", value=TEST_TAG_TEMPLATE_ID)
+        mock_xcom.assert_called_with(
+            context,
+            key="data_catalog_tag_template",
+            value={
+                "tag_template_id": TEST_TAG_TEMPLATE_ID,
+                "location_id": TEST_LOCATION,
+                "project_id": TEST_PROJECT_ID,
+            },
+        )
         assert TEST_TAG_TEMPLATE_DICT == result
 
 
@@ -315,7 +375,8 @@ class TestCloudDataCatalogCreateTagTemplateFieldOperator(TestCase):
         "airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
         **{"return_value.create_tag_template_field.return_value": TEST_TAG_TEMPLATE_FIELD},  # type: ignore
     )
-    def test_assert_valid_hook_call(self, mock_hook) -> None:
+    @mock.patch(BASE_PATH.format("CloudDataCatalogCreateTagTemplateFieldOperator.xcom_push"))
+    def test_assert_valid_hook_call(self, mock_xcom, mock_hook) -> None:
         task = CloudDataCatalogCreateTagTemplateFieldOperator(
             task_id="task_id",
             location=TEST_LOCATION,
@@ -329,8 +390,8 @@ class TestCloudDataCatalogCreateTagTemplateFieldOperator(TestCase):
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
         )
-        ti = mock.MagicMock()
-        result = task.execute(context=Context(task_instance=ti))
+        context = mock.MagicMock()
+        result = task.execute(context=context)
         mock_hook.assert_called_once_with(
             gcp_conn_id=TEST_GCP_CONN_ID,
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -345,7 +406,15 @@ class TestCloudDataCatalogCreateTagTemplateFieldOperator(TestCase):
             timeout=TEST_TIMEOUT,
             metadata=TEST_METADATA,
         )
-        ti.xcom_push.assert_called_once_with(key="tag_template_field_id", value=TEST_TAG_TEMPLATE_FIELD_ID)
+        mock_xcom.assert_called_with(
+            context,
+            key="data_catalog_tag_template",
+            value={
+                "tag_template_id": TEST_TAG_TEMPLATE_ID,
+                "location_id": TEST_LOCATION,
+                "project_id": TEST_PROJECT_ID,
+            },
+        )
         assert TEST_TAG_TEMPLATE_FIELD_DICT == result
 
 
@@ -739,6 +808,12 @@ class TestCloudDataCatalogSearchCatalogOperator(TestCase):
 class TestCloudDataCatalogUpdateEntryOperator(TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
     def test_assert_valid_hook_call(self, mock_hook) -> None:
+        mock_hook.return_value.update_entry.return_value.name = TEST_ENTRY_LINK.format(
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            entry_group_id=TEST_ENTRY_GROUP_ID,
+            entry_id=TEST_ENTRY_ID,
+        )
         task = CloudDataCatalogUpdateEntryOperator(
             task_id="task_id",
             entry=TEST_ENTRY,
@@ -774,6 +849,12 @@ class TestCloudDataCatalogUpdateEntryOperator(TestCase):
 class TestCloudDataCatalogUpdateTagOperator(TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
     def test_assert_valid_hook_call(self, mock_hook) -> None:
+        mock_hook.return_value.update_tag.return_value.name = TEST_ENTRY_LINK.format(
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            entry_group_id=TEST_ENTRY_GROUP_ID,
+            entry_id=TEST_ENTRY_ID,
+        )
         task = CloudDataCatalogUpdateTagOperator(
             task_id="task_id",
             tag=Tag(name=TEST_TAG_ID),
@@ -811,6 +892,11 @@ class TestCloudDataCatalogUpdateTagOperator(TestCase):
 class TestCloudDataCatalogUpdateTagTemplateOperator(TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
     def test_assert_valid_hook_call(self, mock_hook) -> None:
+        mock_hook.return_value.update_tag_template.return_value.name = TEST_TAG_TEMPLATE_LINK.format(
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            tag_template_id=TEST_TAG_TEMPLATE_ID,
+        )
         task = CloudDataCatalogUpdateTagTemplateOperator(
             task_id="task_id",
             tag_template=TagTemplate(name=TEST_TAG_TEMPLATE_ID),
@@ -844,6 +930,14 @@ class TestCloudDataCatalogUpdateTagTemplateOperator(TestCase):
 class TestCloudDataCatalogUpdateTagTemplateFieldOperator(TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
     def test_assert_valid_hook_call(self, mock_hook) -> None:
+        mock_hook.return_value.update_tag_template_field.return_value.name = (
+            TEST_TAG_TEMPLATE_FIELD_LINK.format(
+                project_id=TEST_PROJECT_ID,
+                location=TEST_LOCATION,
+                tag_template_id=TEST_TAG_TEMPLATE_ID,
+                tag_template_field_id=TEST_TAG_TEMPLATE_FIELD_ID,
+            )
+        )
         task = CloudDataCatalogUpdateTagTemplateFieldOperator(
             task_id="task_id",
             tag_template_field=TEST_TAG_TEMPLATE_FIELD,
diff --git a/tests/providers/google/cloud/operators/test_datacatalog_system.py b/tests/system/providers/google/datacatalog/__init__.py
similarity index 57%
rename from tests/providers/google/cloud/operators/test_datacatalog_system.py
rename to tests/system/providers/google/datacatalog/__init__.py
index 00e1ce0b8e..13a83393a9 100644
--- a/tests/providers/google/cloud/operators/test_datacatalog_system.py
+++ b/tests/system/providers/google/datacatalog/__init__.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,20 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import pytest
-
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_DATACATALOG_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-
-@pytest.mark.credential_file(GCP_DATACATALOG_KEY)
-class CloudDataCatalogExampleDagsSystemTest(GoogleSystemTest):
-    def setUp(self):
-        super().setUp()
-
-    @provide_gcp_context(GCP_DATACATALOG_KEY)
-    def test_run_example_gcp_dataflow_native_java(self):
-        self.run_dag('example_gcp_datacatalog', CLOUD_DAG_FOLDER)
-
-    def tearDown(self):
-        super().tearDown()
diff --git a/tests/system/providers/google/datacatalog/example_datacatalog_entries.py b/tests/system/providers/google/datacatalog/example_datacatalog_entries.py
new file mode 100644
index 0000000000..cc7430452b
--- /dev/null
+++ b/tests/system/providers/google/datacatalog/example_datacatalog_entries.py
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.datacatalog import (
+    CloudDataCatalogCreateEntryGroupOperator,
+    CloudDataCatalogCreateEntryOperator,
+    CloudDataCatalogDeleteEntryGroupOperator,
+    CloudDataCatalogDeleteEntryOperator,
+    CloudDataCatalogGetEntryGroupOperator,
+    CloudDataCatalogGetEntryOperator,
+    CloudDataCatalogLookupEntryOperator,
+    CloudDataCatalogUpdateEntryOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datacatalog_entries"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+LOCATION = "us-central1"
+ENTRY_GROUP_ID = f"id_{DAG_ID}_{ENV_ID}"
+ENTRY_GROUP_NAME = f"name {DAG_ID} {ENV_ID}"
+ENTRY_ID = "python_files"
+ENTRY_NAME = "Wizard"
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
+    create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
+
+    # Create
+    # [START howto_operator_gcp_datacatalog_create_entry_group]
+    create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
+        task_id="create_entry_group",
+        location=LOCATION,
+        entry_group_id=ENTRY_GROUP_ID,
+        entry_group={"display_name": ENTRY_GROUP_NAME},
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_group]
+
+    # [START howto_operator_gcp_datacatalog_create_entry_group_result]
+    create_entry_group_result = BashOperator(
+        task_id="create_entry_group_result",
+        bash_command=f"echo {create_entry_group.output['entry_group_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_group_result]
+
+    # [START howto_operator_gcp_datacatalog_create_entry_gcs]
+    create_entry_gcs = CloudDataCatalogCreateEntryOperator(
+        task_id="create_entry_gcs",
+        location=LOCATION,
+        entry_group=ENTRY_GROUP_ID,
+        entry_id=ENTRY_ID,
+        entry={
+            "display_name": ENTRY_NAME,
+            "type_": "FILESET",
+            "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
+        },
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_gcs]
+
+    # [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
+    create_entry_gcs_result = BashOperator(
+        task_id="create_entry_gcs_result",
+        bash_command=f"echo {create_entry_gcs.output['entry_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_gcs_result]
+
+    # Get
+    # [START howto_operator_gcp_datacatalog_get_entry_group]
+    get_entry_group = CloudDataCatalogGetEntryGroupOperator(
+        task_id="get_entry_group",
+        location=LOCATION,
+        entry_group=ENTRY_GROUP_ID,
+        read_mask=FieldMask(paths=["name", "display_name"]),
+    )
+    # [END howto_operator_gcp_datacatalog_get_entry_group]
+
+    # [START howto_operator_gcp_datacatalog_get_entry_group_result]
+    get_entry_group_result = BashOperator(
+        task_id="get_entry_group_result",
+        bash_command=f"echo {get_entry_group.output}",
+    )
+    # [END howto_operator_gcp_datacatalog_get_entry_group_result]
+
+    # [START howto_operator_gcp_datacatalog_get_entry]
+    get_entry = CloudDataCatalogGetEntryOperator(
+        task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+    )
+    # [END howto_operator_gcp_datacatalog_get_entry]
+
+    # [START howto_operator_gcp_datacatalog_get_entry_result]
+    get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
+    # [END howto_operator_gcp_datacatalog_get_entry_result]
+
+    # Lookup
+    # [START howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
+    current_entry_template = (
+        "//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
+        "entryGroups/{entry_group}/entries/{entry}"
+    )
+    lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
+        task_id="lookup_entry",
+        linked_resource=current_entry_template.format(
+            project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+        ),
+    )
+    # [END howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
+
+    # [START howto_operator_gcp_datacatalog_lookup_entry_result]
+    lookup_entry_result = BashOperator(
+        task_id="lookup_entry_result",
+        bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
+    )
+    # [END howto_operator_gcp_datacatalog_lookup_entry_result]
+
+    # Update
+    # [START howto_operator_gcp_datacatalog_update_entry]
+    update_entry = CloudDataCatalogUpdateEntryOperator(
+        task_id="update_entry",
+        entry={"display_name": f"{ENTRY_NAME} UPDATED"},
+        update_mask={"paths": ["display_name"]},
+        location=LOCATION,
+        entry_group=ENTRY_GROUP_ID,
+        entry_id=ENTRY_ID,
+    )
+    # [END howto_operator_gcp_datacatalog_update_entry]
+
+    # Delete
+    # [START howto_operator_gcp_datacatalog_delete_entry]
+    delete_entry = CloudDataCatalogDeleteEntryOperator(
+        task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+    )
+    # [END howto_operator_gcp_datacatalog_delete_entry]
+    delete_entry.trigger_rule = TriggerRule.ALL_DONE
+
+    # [START howto_operator_gcp_datacatalog_delete_entry_group]
+    delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
+        task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
+    )
+    # [END howto_operator_gcp_datacatalog_delete_entry_group]
+    delete_entry_group.trigger_rule = TriggerRule.ALL_DONE
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        # TEST BODY
+        >> create_entry_group
+        >> create_entry_group_result
+        >> get_entry_group
+        >> get_entry_group_result
+        >> create_entry_gcs
+        >> create_entry_gcs_result
+        >> get_entry
+        >> get_entry_result
+        >> lookup_entry_linked_resource
+        >> lookup_entry_result
+        >> update_entry
+        >> delete_entry
+        >> delete_entry_group
+        # TEST TEARDOWN
+        >> delete_bucket
+    )
+
+    # ### Everything below this line is not part of example ###
+    # ### Just for system tests purpose ###
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py b/tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py
new file mode 100644
index 0000000000..e12ee63c9b
--- /dev/null
+++ b/tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from google.cloud.datacatalog import TagField, TagTemplateField
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.datacatalog import (
+    CloudDataCatalogCreateEntryGroupOperator,
+    CloudDataCatalogCreateEntryOperator,
+    CloudDataCatalogCreateTagOperator,
+    CloudDataCatalogCreateTagTemplateOperator,
+    CloudDataCatalogDeleteEntryGroupOperator,
+    CloudDataCatalogDeleteEntryOperator,
+    CloudDataCatalogDeleteTagOperator,
+    CloudDataCatalogDeleteTagTemplateOperator,
+    CloudDataCatalogSearchCatalogOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datacatalog_search_catalog"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+LOCATION = "us-central1"
+ENTRY_GROUP_ID = f"id_{DAG_ID}_{ENV_ID}"
+ENTRY_GROUP_NAME = f"name {DAG_ID} {ENV_ID}"
+ENTRY_ID = "python_files"
+ENTRY_NAME = "Wizard"
+TEMPLATE_ID = "template_id"
+TAG_TEMPLATE_DISPLAY_NAME = f"Data Catalog {DAG_ID} {ENV_ID}"
+FIELD_NAME_1 = "first"
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
+    create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
+
+    # Create
+    # [START howto_operator_gcp_datacatalog_create_entry_group]
+    create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
+        task_id="create_entry_group",
+        location=LOCATION,
+        entry_group_id=ENTRY_GROUP_ID,
+        entry_group={"display_name": ENTRY_GROUP_NAME},
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_group]
+
+    # [START howto_operator_gcp_datacatalog_create_entry_group_result]
+    create_entry_group_result = BashOperator(
+        task_id="create_entry_group_result",
+        bash_command=f"echo {create_entry_group.output['entry_group_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_group_result]
+
+    # [START howto_operator_gcp_datacatalog_create_entry_gcs]
+    create_entry_gcs = CloudDataCatalogCreateEntryOperator(
+        task_id="create_entry_gcs",
+        location=LOCATION,
+        entry_group=ENTRY_GROUP_ID,
+        entry_id=ENTRY_ID,
+        entry={
+            "display_name": ENTRY_NAME,
+            "type_": "FILESET",
+            "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
+        },
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_gcs]
+
+    # [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
+    create_entry_gcs_result = BashOperator(
+        task_id="create_entry_gcs_result",
+        bash_command=f"echo {create_entry_gcs.output['entry_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_gcs_result]
+
+    # [START howto_operator_gcp_datacatalog_create_tag]
+    create_tag = CloudDataCatalogCreateTagOperator(
+        task_id="create_tag",
+        location=LOCATION,
+        entry_group=ENTRY_GROUP_ID,
+        entry=ENTRY_ID,
+        template_id=TEMPLATE_ID,
+        tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag]
+
+    # [START howto_operator_gcp_datacatalog_create_tag_result]
+    create_tag_result = BashOperator(
+        task_id="create_tag_result",
+        bash_command=f"echo {create_tag.output['tag_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag_result]
+
+    # [START howto_operator_gcp_datacatalog_create_tag_template]
+    create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
+        task_id="create_tag_template",
+        location=LOCATION,
+        tag_template_id=TEMPLATE_ID,
+        tag_template={
+            "display_name": TAG_TEMPLATE_DISPLAY_NAME,
+            "fields": {
+                FIELD_NAME_1: TagTemplateField(
+                    display_name="first-field", type_=dict(primitive_type="STRING")
+                )
+            },
+        },
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag_template]
+
+    # [START howto_operator_gcp_datacatalog_create_tag_template_result]
+    create_tag_template_result = BashOperator(
+        task_id="create_tag_template_result",
+        bash_command=f"echo {create_tag_template.output['tag_template_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag_template_result]
+
+    # Search
+    # [START howto_operator_gcp_datacatalog_search_catalog]
+    search_catalog = CloudDataCatalogSearchCatalogOperator(
+        task_id="search_catalog", scope={"include_project_ids": [PROJECT_ID]}, query=f"projectid:{PROJECT_ID}"
+    )
+    # [END howto_operator_gcp_datacatalog_search_catalog]
+
+    # [START howto_operator_gcp_datacatalog_search_catalog_result]
+    search_catalog_result = BashOperator(
+        task_id="search_catalog_result",
+        bash_command=f"echo {search_catalog.output}",
+    )
+    # [END howto_operator_gcp_datacatalog_search_catalog_result]
+
+    # Delete
+    # [START howto_operator_gcp_datacatalog_delete_entry]
+    delete_entry = CloudDataCatalogDeleteEntryOperator(
+        task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+    )
+    # [END howto_operator_gcp_datacatalog_delete_entry]
+    delete_entry.trigger_rule = TriggerRule.ALL_DONE
+
+    # [START howto_operator_gcp_datacatalog_delete_entry_group]
+    delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
+        task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
+    )
+    # [END howto_operator_gcp_datacatalog_delete_entry_group]
+    delete_entry_group.trigger_rule = TriggerRule.ALL_DONE
+
+    # [START howto_operator_gcp_datacatalog_delete_tag]
+    delete_tag = CloudDataCatalogDeleteTagOperator(
+        task_id="delete_tag",
+        location=LOCATION,
+        entry_group=ENTRY_GROUP_ID,
+        entry=ENTRY_ID,
+        tag=create_tag.output["tag_id"],
+    )
+    # [END howto_operator_gcp_datacatalog_delete_tag]
+    delete_tag.trigger_rule = TriggerRule.ALL_DONE
+
+    # [START howto_operator_gcp_datacatalog_delete_tag_template]
+    delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
+        task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
+    )
+    # [END howto_operator_gcp_datacatalog_delete_tag_template]
+    delete_tag_template.trigger_rule = TriggerRule.ALL_DONE
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        # TEST BODY
+        >> create_entry_group
+        >> create_entry_group_result
+        >> create_entry_gcs
+        >> create_entry_gcs_result
+        >> create_tag_template
+        >> create_tag_template_result
+        >> create_tag
+        >> create_tag_result
+        >> search_catalog
+        >> search_catalog_result
+        >> delete_tag
+        >> delete_tag_template
+        >> delete_entry
+        >> delete_entry_group
+        # TEST TEARDOWN
+        >> delete_bucket
+    )
+
+    # ### Everything below this line is not part of example ###
+    # ### Just for system tests purpose ###
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py b/tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
new file mode 100644
index 0000000000..291af290da
--- /dev/null
+++ b/tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
@@ -0,0 +1,192 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from google.cloud.datacatalog import FieldType, TagTemplateField
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.datacatalog import (
+    CloudDataCatalogCreateTagTemplateFieldOperator,
+    CloudDataCatalogCreateTagTemplateOperator,
+    CloudDataCatalogDeleteTagTemplateFieldOperator,
+    CloudDataCatalogDeleteTagTemplateOperator,
+    CloudDataCatalogGetTagTemplateOperator,
+    CloudDataCatalogRenameTagTemplateFieldOperator,
+    CloudDataCatalogUpdateTagTemplateFieldOperator,
+    CloudDataCatalogUpdateTagTemplateOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datacatalog_tag_templates"
+
+LOCATION = "us-central1"
+TEMPLATE_ID = "template_id"
+TAG_TEMPLATE_DISPLAY_NAME = f"Data Catalog {DAG_ID} {ENV_ID}"
+FIELD_NAME_1 = "first"
+FIELD_NAME_2 = "second"
+FIELD_NAME_3 = "first-rename"
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
+    # Create
+    # [START howto_operator_gcp_datacatalog_create_tag_template]
+    create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
+        task_id="create_tag_template",
+        location=LOCATION,
+        tag_template_id=TEMPLATE_ID,
+        tag_template={
+            "display_name": TAG_TEMPLATE_DISPLAY_NAME,
+            "fields": {
+                FIELD_NAME_1: TagTemplateField(
+                    display_name="first-field", type_=dict(primitive_type="STRING")
+                )
+            },
+        },
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag_template]
+
+    # [START howto_operator_gcp_datacatalog_create_tag_template_result]
+    create_tag_template_result = BashOperator(
+        task_id="create_tag_template_result",
+        bash_command=f"echo {create_tag_template.output['tag_template_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag_template_result]
+
+    # [START howto_operator_gcp_datacatalog_create_tag_template_field]
+    create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
+        task_id="create_tag_template_field",
+        location=LOCATION,
+        tag_template=TEMPLATE_ID,
+        tag_template_field_id=FIELD_NAME_2,
+        tag_template_field=TagTemplateField(
+            display_name="second-field", type_=FieldType(primitive_type="STRING")
+        ),
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag_template_field]
+
+    # [START howto_operator_gcp_datacatalog_create_tag_template_field_result]
+    create_tag_template_field_result = BashOperator(
+        task_id="create_tag_template_field_result",
+        bash_command=f"echo {create_tag_template_field.output['tag_template_field_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag_template_field_result]
+
+    # Get
+    # [START howto_operator_gcp_datacatalog_get_tag_template]
+    get_tag_template = CloudDataCatalogGetTagTemplateOperator(
+        task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
+    )
+    # [END howto_operator_gcp_datacatalog_get_tag_template]
+
+    # [START howto_operator_gcp_datacatalog_get_tag_template_result]
+    get_tag_template_result = BashOperator(
+        task_id="get_tag_template_result",
+        bash_command=f"echo {get_tag_template.output}",
+    )
+    # [END howto_operator_gcp_datacatalog_get_tag_template_result]
+
+    # Rename
+    # [START howto_operator_gcp_datacatalog_rename_tag_template_field]
+    rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
+        task_id="rename_tag_template_field",
+        location=LOCATION,
+        tag_template=TEMPLATE_ID,
+        field=FIELD_NAME_1,
+        new_tag_template_field_id=FIELD_NAME_3,
+    )
+    # [END howto_operator_gcp_datacatalog_rename_tag_template_field]
+
+    # Update
+    # [START howto_operator_gcp_datacatalog_update_tag_template]
+    update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
+        task_id="update_tag_template",
+        tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
+        update_mask={"paths": ["display_name"]},
+        location=LOCATION,
+        tag_template_id=TEMPLATE_ID,
+    )
+    # [END howto_operator_gcp_datacatalog_update_tag_template]
+
+    # [START howto_operator_gcp_datacatalog_update_tag_template_field]
+    update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
+        task_id="update_tag_template_field",
+        tag_template_field={"display_name": "Updated template field"},
+        update_mask={"paths": ["display_name"]},
+        location=LOCATION,
+        tag_template=TEMPLATE_ID,
+        tag_template_field_id=FIELD_NAME_1,
+    )
+    # [END howto_operator_gcp_datacatalog_update_tag_template_field]
+
+    # Delete
+    # [START howto_operator_gcp_datacatalog_delete_tag_template_field]
+    delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
+        task_id="delete_tag_template_field",
+        location=LOCATION,
+        tag_template=TEMPLATE_ID,
+        field=FIELD_NAME_2,
+        force=True,
+    )
+    # [END howto_operator_gcp_datacatalog_delete_tag_template_field]
+    delete_tag_template_field.trigger_rule = TriggerRule.ALL_DONE
+
+    # [START howto_operator_gcp_datacatalog_delete_tag_template]
+    delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
+        task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
+    )
+    # [END howto_operator_gcp_datacatalog_delete_tag_template]
+    delete_tag_template.trigger_rule = TriggerRule.ALL_DONE
+
+    (
+        # TEST BODY
+        create_tag_template
+        >> create_tag_template_result
+        >> create_tag_template_field
+        >> create_tag_template_field_result
+        >> get_tag_template
+        >> get_tag_template_result
+        >> update_tag_template
+        >> update_tag_template_field
+        >> rename_tag_template_field
+        >> delete_tag_template_field
+        >> delete_tag_template
+    )
+
+    # ### Everything below this line is not part of example ###
+    # ### Just for system tests purpose ###
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/datacatalog/example_datacatalog_tags.py b/tests/system/providers/google/datacatalog/example_datacatalog_tags.py
new file mode 100644
index 0000000000..20eeb3895a
--- /dev/null
+++ b/tests/system/providers/google/datacatalog/example_datacatalog_tags.py
@@ -0,0 +1,239 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from google.cloud.datacatalog import TagField, TagTemplateField
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.datacatalog import (
+    CloudDataCatalogCreateEntryGroupOperator,
+    CloudDataCatalogCreateEntryOperator,
+    CloudDataCatalogCreateTagOperator,
+    CloudDataCatalogCreateTagTemplateOperator,
+    CloudDataCatalogDeleteEntryGroupOperator,
+    CloudDataCatalogDeleteEntryOperator,
+    CloudDataCatalogDeleteTagOperator,
+    CloudDataCatalogDeleteTagTemplateOperator,
+    CloudDataCatalogListTagsOperator,
+    CloudDataCatalogUpdateTagOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datacatalog_tags"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+LOCATION = "us-central1"
+ENTRY_GROUP_ID = f"id_{DAG_ID}_{ENV_ID}"
+ENTRY_GROUP_NAME = f"name {DAG_ID} {ENV_ID}"
+ENTRY_ID = "python_files"
+ENTRY_NAME = "Wizard"
+TEMPLATE_ID = "template_id"
+TAG_TEMPLATE_DISPLAY_NAME = f"Data Catalog {DAG_ID} {ENV_ID}"
+FIELD_NAME_1 = "first"
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
+    create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
+
+    # Create
+    # [START howto_operator_gcp_datacatalog_create_entry_group]
+    create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
+        task_id="create_entry_group",
+        location=LOCATION,
+        entry_group_id=ENTRY_GROUP_ID,
+        entry_group={"display_name": ENTRY_GROUP_NAME},
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_group]
+
+    # [START howto_operator_gcp_datacatalog_create_entry_group_result]
+    create_entry_group_result = BashOperator(
+        task_id="create_entry_group_result",
+        bash_command=f"echo {create_entry_group.output['entry_group_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_group_result]
+
+    # [START howto_operator_gcp_datacatalog_create_entry_gcs]
+    create_entry_gcs = CloudDataCatalogCreateEntryOperator(
+        task_id="create_entry_gcs",
+        location=LOCATION,
+        entry_group=ENTRY_GROUP_ID,
+        entry_id=ENTRY_ID,
+        entry={
+            "display_name": ENTRY_NAME,
+            "type_": "FILESET",
+            "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
+        },
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_gcs]
+
+    # [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
+    create_entry_gcs_result = BashOperator(
+        task_id="create_entry_gcs_result",
+        bash_command=f"echo {create_entry_gcs.output['entry_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_entry_gcs_result]
+
+    # [START howto_operator_gcp_datacatalog_create_tag]
+    create_tag = CloudDataCatalogCreateTagOperator(
+        task_id="create_tag",
+        location=LOCATION,
+        entry_group=ENTRY_GROUP_ID,
+        entry=ENTRY_ID,
+        template_id=TEMPLATE_ID,
+        tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag]
+
+    # [START howto_operator_gcp_datacatalog_create_tag_result]
+    create_tag_result = BashOperator(
+        task_id="create_tag_result",
+        bash_command=f"echo {create_tag.output['tag_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag_result]
+
+    # [START howto_operator_gcp_datacatalog_create_tag_template]
+    create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
+        task_id="create_tag_template",
+        location=LOCATION,
+        tag_template_id=TEMPLATE_ID,
+        tag_template={
+            "display_name": TAG_TEMPLATE_DISPLAY_NAME,
+            "fields": {
+                FIELD_NAME_1: TagTemplateField(
+                    display_name="first-field", type_=dict(primitive_type="STRING")
+                )
+            },
+        },
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag_template]
+
+    # [START howto_operator_gcp_datacatalog_create_tag_template_result]
+    create_tag_template_result = BashOperator(
+        task_id="create_tag_template_result",
+        bash_command=f"echo {create_tag_template.output['tag_template_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_create_tag_template_result]
+
+    # List
+    # [START howto_operator_gcp_datacatalog_list_tags]
+    list_tags = CloudDataCatalogListTagsOperator(
+        task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+    )
+    # [END howto_operator_gcp_datacatalog_list_tags]
+
+    # [START howto_operator_gcp_datacatalog_list_tags_result]
+    list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")
+    # [END howto_operator_gcp_datacatalog_list_tags_result]
+
+    # Update
+    # [START howto_operator_gcp_datacatalog_update_tag]
+    update_tag = CloudDataCatalogUpdateTagOperator(
+        task_id="update_tag",
+        tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
+        update_mask={"paths": ["fields"]},
+        location=LOCATION,
+        entry_group=ENTRY_GROUP_ID,
+        entry=ENTRY_ID,
+        tag_id=f"{create_tag.output['tag_id']}",
+    )
+    # [END howto_operator_gcp_datacatalog_update_tag]
+
+    # # Delete
+    # [START howto_operator_gcp_datacatalog_delete_entry]
+    delete_entry = CloudDataCatalogDeleteEntryOperator(
+        task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+    )
+    # [END howto_operator_gcp_datacatalog_delete_entry]
+    delete_entry.trigger_rule = TriggerRule.ALL_DONE
+
+    # [START howto_operator_gcp_datacatalog_delete_entry_group]
+    delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
+        task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
+    )
+    # [END howto_operator_gcp_datacatalog_delete_entry_group]
+    delete_entry_group.trigger_rule = TriggerRule.ALL_DONE
+
+    # [START howto_operator_gcp_datacatalog_delete_tag]
+    delete_tag = CloudDataCatalogDeleteTagOperator(
+        task_id="delete_tag",
+        location=LOCATION,
+        entry_group=ENTRY_GROUP_ID,
+        entry=ENTRY_ID,
+        tag=create_tag.output["tag_id"],
+    )
+    # [END howto_operator_gcp_datacatalog_delete_tag]
+    delete_tag.trigger_rule = TriggerRule.ALL_DONE
+
+    # [START howto_operator_gcp_datacatalog_delete_tag_template]
+    delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
+        task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
+    )
+    # [END howto_operator_gcp_datacatalog_delete_tag_template]
+    delete_tag_template.trigger_rule = TriggerRule.ALL_DONE
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        # TEST BODY
+        >> create_entry_group
+        >> create_entry_group_result
+        >> create_entry_gcs
+        >> create_entry_gcs_result
+        >> create_tag_template
+        >> create_tag_template_result
+        >> create_tag
+        >> create_tag_result
+        >> list_tags
+        >> list_tags_result
+        >> update_tag
+        >> delete_tag
+        >> delete_tag_template
+        >> delete_entry
+        >> delete_entry_group
+        # TEST TEARDOWN
+        >> delete_bucket
+    )
+
+    # ### Everything below this line is not part of example ###
+    # ### Just for system tests purpose ###
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)