You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/04 21:40:22 UTC
[airflow] branch main updated: Datacatalog assets & system tests migration (AIP-47) (#24600)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9227d56e68 Datacatalog assets & system tests migration (AIP-47) (#24600)
9227d56e68 is described below
commit 9227d56e681a40e9caf2eefae87f7046c0d0c9f4
Author: Wojciech Januszek <wj...@sigma.ug.edu.pl>
AuthorDate: Mon Jul 4 23:39:59 2022 +0200
Datacatalog assets & system tests migration (AIP-47) (#24600)
---
.../cloud/example_dags/example_datacatalog.py | 452 ---------------------
.../providers/google/cloud/links/datacatalog.py | 112 +++++
.../google/cloud/operators/datacatalog.py | 165 +++++++-
airflow/providers/google/provider.yaml | 3 +
.../operators/cloud/datacatalog.rst | 138 +++----
docs/spelling_wordlist.txt | 1 +
.../google/cloud/operators/test_datacatalog.py | 148 +++++--
.../providers/google/datacatalog/__init__.py} | 18 -
.../datacatalog/example_datacatalog_entries.py | 209 ++++++++++
.../example_datacatalog_search_catalog.py | 227 +++++++++++
.../example_datacatalog_tag_templates.py | 192 +++++++++
.../google/datacatalog/example_datacatalog_tags.py | 239 +++++++++++
12 files changed, 1312 insertions(+), 592 deletions(-)
diff --git a/airflow/providers/google/cloud/example_dags/example_datacatalog.py b/airflow/providers/google/cloud/example_dags/example_datacatalog.py
deleted file mode 100644
index 848cd5ef90..0000000000
--- a/airflow/providers/google/cloud/example_dags/example_datacatalog.py
+++ /dev/null
@@ -1,452 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG that interacts with Google Data Catalog service
-"""
-import os
-from datetime import datetime
-
-from google.cloud.datacatalog_v1beta1 import FieldType, TagField, TagTemplateField
-from google.protobuf.field_mask_pb2 import FieldMask
-
-from airflow import models
-from airflow.models.baseoperator import chain
-from airflow.operators.bash import BashOperator
-from airflow.providers.google.cloud.operators.datacatalog import (
- CloudDataCatalogCreateEntryGroupOperator,
- CloudDataCatalogCreateEntryOperator,
- CloudDataCatalogCreateTagOperator,
- CloudDataCatalogCreateTagTemplateFieldOperator,
- CloudDataCatalogCreateTagTemplateOperator,
- CloudDataCatalogDeleteEntryGroupOperator,
- CloudDataCatalogDeleteEntryOperator,
- CloudDataCatalogDeleteTagOperator,
- CloudDataCatalogDeleteTagTemplateFieldOperator,
- CloudDataCatalogDeleteTagTemplateOperator,
- CloudDataCatalogGetEntryGroupOperator,
- CloudDataCatalogGetEntryOperator,
- CloudDataCatalogGetTagTemplateOperator,
- CloudDataCatalogListTagsOperator,
- CloudDataCatalogLookupEntryOperator,
- CloudDataCatalogRenameTagTemplateFieldOperator,
- CloudDataCatalogSearchCatalogOperator,
- CloudDataCatalogUpdateEntryOperator,
- CloudDataCatalogUpdateTagOperator,
- CloudDataCatalogUpdateTagTemplateFieldOperator,
- CloudDataCatalogUpdateTagTemplateOperator,
-)
-
-PROJECT_ID = os.getenv("GCP_PROJECT_ID")
-BUCKET_ID = os.getenv("GCP_TEST_DATA_BUCKET", "INVALID BUCKET NAME")
-LOCATION = "us-central1"
-ENTRY_GROUP_ID = "important_data_jan_2019"
-ENTRY_ID = "python_files"
-TEMPLATE_ID = "template_id"
-FIELD_NAME_1 = "first"
-FIELD_NAME_2 = "second"
-FIELD_NAME_3 = "first-rename"
-
-with models.DAG(
- "example_gcp_datacatalog",
- schedule_interval='@once',
- start_date=datetime(2021, 1, 1),
- catchup=False,
-) as dag:
- # Create
- # [START howto_operator_gcp_datacatalog_create_entry_group]
- create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
- task_id="create_entry_group",
- location=LOCATION,
- entry_group_id=ENTRY_GROUP_ID,
- entry_group={"display_name": "analytics data - jan 2011"},
- )
- # [END howto_operator_gcp_datacatalog_create_entry_group]
-
- # [START howto_operator_gcp_datacatalog_create_entry_group_result]
- create_entry_group_result = BashOperator(
- task_id="create_entry_group_result",
- bash_command=f"echo {create_entry_group.output['entry_group_id']}",
- )
- # [END howto_operator_gcp_datacatalog_create_entry_group_result]
-
- # [START howto_operator_gcp_datacatalog_create_entry_group_result2]
- create_entry_group_result2 = BashOperator(
- task_id="create_entry_group_result2",
- bash_command=f"echo {create_entry_group.output}",
- )
- # [END howto_operator_gcp_datacatalog_create_entry_group_result2]
-
- # [START howto_operator_gcp_datacatalog_create_entry_gcs]
- create_entry_gcs = CloudDataCatalogCreateEntryOperator(
- task_id="create_entry_gcs",
- location=LOCATION,
- entry_group=ENTRY_GROUP_ID,
- entry_id=ENTRY_ID,
- entry={
- "display_name": "Wizard",
- "type_": "FILESET",
- "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_ID}/**"]},
- },
- )
- # [END howto_operator_gcp_datacatalog_create_entry_gcs]
-
- # [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
- create_entry_gcs_result = BashOperator(
- task_id="create_entry_gcs_result",
- bash_command=f"echo {create_entry_gcs.output['entry_id']}",
- )
- # [END howto_operator_gcp_datacatalog_create_entry_gcs_result]
-
- # [START howto_operator_gcp_datacatalog_create_entry_gcs_result2]
- create_entry_gcs_result2 = BashOperator(
- task_id="create_entry_gcs_result2",
- bash_command=f"echo {create_entry_gcs.output}",
- )
- # [END howto_operator_gcp_datacatalog_create_entry_gcs_result2]
-
- # [START howto_operator_gcp_datacatalog_create_tag]
- create_tag = CloudDataCatalogCreateTagOperator(
- task_id="create_tag",
- location=LOCATION,
- entry_group=ENTRY_GROUP_ID,
- entry=ENTRY_ID,
- template_id=TEMPLATE_ID,
- tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
- )
- # [END howto_operator_gcp_datacatalog_create_tag]
-
- # [START howto_operator_gcp_datacatalog_create_tag_result]
- create_tag_result = BashOperator(
- task_id="create_tag_result",
- bash_command=f"echo {create_tag.output['tag_id']}",
- )
- # [END howto_operator_gcp_datacatalog_create_tag_result]
-
- # [START howto_operator_gcp_datacatalog_create_tag_result2]
- create_tag_result2 = BashOperator(task_id="create_tag_result2", bash_command=f"echo {create_tag.output}")
- # [END howto_operator_gcp_datacatalog_create_tag_result2]
-
- # [START howto_operator_gcp_datacatalog_create_tag_template]
- create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
- task_id="create_tag_template",
- location=LOCATION,
- tag_template_id=TEMPLATE_ID,
- tag_template={
- "display_name": "Awesome Tag Template",
- "fields": {
- FIELD_NAME_1: TagTemplateField(
- display_name="first-field", type_=dict(primitive_type="STRING")
- )
- },
- },
- )
- # [END howto_operator_gcp_datacatalog_create_tag_template]
-
- # [START howto_operator_gcp_datacatalog_create_tag_template_result]
- create_tag_template_result = BashOperator(
- task_id="create_tag_template_result",
- bash_command=f"echo {create_tag_template.output['tag_template_id']}",
- )
- # [END howto_operator_gcp_datacatalog_create_tag_template_result]
-
- # [START howto_operator_gcp_datacatalog_create_tag_template_result2]
- create_tag_template_result2 = BashOperator(
- task_id="create_tag_template_result2",
- bash_command=f"echo {create_tag_template.output}",
- )
- # [END howto_operator_gcp_datacatalog_create_tag_template_result2]
-
- # [START howto_operator_gcp_datacatalog_create_tag_template_field]
- create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
- task_id="create_tag_template_field",
- location=LOCATION,
- tag_template=TEMPLATE_ID,
- tag_template_field_id=FIELD_NAME_2,
- tag_template_field=TagTemplateField(
- display_name="second-field", type_=FieldType(primitive_type="STRING")
- ),
- )
- # [END howto_operator_gcp_datacatalog_create_tag_template_field]
-
- # [START howto_operator_gcp_datacatalog_create_tag_template_field_result]
- create_tag_template_field_result = BashOperator(
- task_id="create_tag_template_field_result",
- bash_command=f"echo {create_tag_template_field.output['tag_template_field_id']}",
- )
- # [END howto_operator_gcp_datacatalog_create_tag_template_field_result]
-
- # [START howto_operator_gcp_datacatalog_create_tag_template_field_result2]
- create_tag_template_field_result2 = BashOperator(
- task_id="create_tag_template_field_result2",
- bash_command=f"echo {create_tag_template_field.output}",
- )
- # [END howto_operator_gcp_datacatalog_create_tag_template_field_result2]
-
- # Delete
- # [START howto_operator_gcp_datacatalog_delete_entry]
- delete_entry = CloudDataCatalogDeleteEntryOperator(
- task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
- )
- # [END howto_operator_gcp_datacatalog_delete_entry]
-
- # [START howto_operator_gcp_datacatalog_delete_entry_group]
- delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
- task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
- )
- # [END howto_operator_gcp_datacatalog_delete_entry_group]
-
- # [START howto_operator_gcp_datacatalog_delete_tag]
- delete_tag = CloudDataCatalogDeleteTagOperator(
- task_id="delete_tag",
- location=LOCATION,
- entry_group=ENTRY_GROUP_ID,
- entry=ENTRY_ID,
- tag=create_tag.output["tag_id"],
- )
- # [END howto_operator_gcp_datacatalog_delete_tag]
-
- # [START howto_operator_gcp_datacatalog_delete_tag_template_field]
- delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
- task_id="delete_tag_template_field",
- location=LOCATION,
- tag_template=TEMPLATE_ID,
- field=FIELD_NAME_2,
- force=True,
- )
- # [END howto_operator_gcp_datacatalog_delete_tag_template_field]
-
- # [START howto_operator_gcp_datacatalog_delete_tag_template]
- delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
- task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
- )
- # [END howto_operator_gcp_datacatalog_delete_tag_template]
-
- # Get
- # [START howto_operator_gcp_datacatalog_get_entry_group]
- get_entry_group = CloudDataCatalogGetEntryGroupOperator(
- task_id="get_entry_group",
- location=LOCATION,
- entry_group=ENTRY_GROUP_ID,
- read_mask=FieldMask(paths=["name", "display_name"]),
- )
- # [END howto_operator_gcp_datacatalog_get_entry_group]
-
- # [START howto_operator_gcp_datacatalog_get_entry_group_result]
- get_entry_group_result = BashOperator(
- task_id="get_entry_group_result",
- bash_command=f"echo {get_entry_group.output}",
- )
- # [END howto_operator_gcp_datacatalog_get_entry_group_result]
-
- # [START howto_operator_gcp_datacatalog_get_entry]
- get_entry = CloudDataCatalogGetEntryOperator(
- task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
- )
- # [END howto_operator_gcp_datacatalog_get_entry]
-
- # [START howto_operator_gcp_datacatalog_get_entry_result]
- get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
- # [END howto_operator_gcp_datacatalog_get_entry_result]
-
- # [START howto_operator_gcp_datacatalog_get_tag_template]
- get_tag_template = CloudDataCatalogGetTagTemplateOperator(
- task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
- )
- # [END howto_operator_gcp_datacatalog_get_tag_template]
-
- # [START howto_operator_gcp_datacatalog_get_tag_template_result]
- get_tag_template_result = BashOperator(
- task_id="get_tag_template_result",
- bash_command=f"{get_tag_template.output}",
- )
- # [END howto_operator_gcp_datacatalog_get_tag_template_result]
-
- # List
- # [START howto_operator_gcp_datacatalog_list_tags]
- list_tags = CloudDataCatalogListTagsOperator(
- task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
- )
- # [END howto_operator_gcp_datacatalog_list_tags]
-
- # [START howto_operator_gcp_datacatalog_list_tags_result]
- list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")
- # [END howto_operator_gcp_datacatalog_list_tags_result]
-
- # Lookup
- # [START howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
- current_entry_template = (
- "//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
- "entryGroups/{entry_group}/entries/{entry}"
- )
- lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
- task_id="lookup_entry",
- linked_resource=current_entry_template.format(
- project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
- ),
- )
- # [END howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
-
- # [START howto_operator_gcp_datacatalog_lookup_entry_result]
- lookup_entry_result = BashOperator(
- task_id="lookup_entry_result",
- bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
- )
- # [END howto_operator_gcp_datacatalog_lookup_entry_result]
-
- # Rename
- # [START howto_operator_gcp_datacatalog_rename_tag_template_field]
- rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
- task_id="rename_tag_template_field",
- location=LOCATION,
- tag_template=TEMPLATE_ID,
- field=FIELD_NAME_1,
- new_tag_template_field_id=FIELD_NAME_3,
- )
- # [END howto_operator_gcp_datacatalog_rename_tag_template_field]
-
- # Search
- # [START howto_operator_gcp_datacatalog_search_catalog]
- search_catalog = CloudDataCatalogSearchCatalogOperator(
- task_id="search_catalog", scope={"include_project_ids": [PROJECT_ID]}, query=f"projectid:{PROJECT_ID}"
- )
- # [END howto_operator_gcp_datacatalog_search_catalog]
-
- # [START howto_operator_gcp_datacatalog_search_catalog_result]
- search_catalog_result = BashOperator(
- task_id="search_catalog_result",
- bash_command=f"echo {search_catalog.output}",
- )
- # [END howto_operator_gcp_datacatalog_search_catalog_result]
-
- # Update
- # [START howto_operator_gcp_datacatalog_update_entry]
- update_entry = CloudDataCatalogUpdateEntryOperator(
- task_id="update_entry",
- entry={"display_name": "New Wizard"},
- update_mask={"paths": ["display_name"]},
- location=LOCATION,
- entry_group=ENTRY_GROUP_ID,
- entry_id=ENTRY_ID,
- )
- # [END howto_operator_gcp_datacatalog_update_entry]
-
- # [START howto_operator_gcp_datacatalog_update_tag]
- update_tag = CloudDataCatalogUpdateTagOperator(
- task_id="update_tag",
- tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
- update_mask={"paths": ["fields"]},
- location=LOCATION,
- entry_group=ENTRY_GROUP_ID,
- entry=ENTRY_ID,
- tag_id=f"{create_tag.output['tag_id']}",
- )
- # [END howto_operator_gcp_datacatalog_update_tag]
-
- # [START howto_operator_gcp_datacatalog_update_tag_template]
- update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
- task_id="update_tag_template",
- tag_template={"display_name": "Awesome Tag Template"},
- update_mask={"paths": ["display_name"]},
- location=LOCATION,
- tag_template_id=TEMPLATE_ID,
- )
- # [END howto_operator_gcp_datacatalog_update_tag_template]
-
- # [START howto_operator_gcp_datacatalog_update_tag_template_field]
- update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
- task_id="update_tag_template_field",
- tag_template_field={"display_name": "Updated template field"},
- update_mask={"paths": ["display_name"]},
- location=LOCATION,
- tag_template=TEMPLATE_ID,
- tag_template_field_id=FIELD_NAME_1,
- )
- # [END howto_operator_gcp_datacatalog_update_tag_template_field]
-
- # Create
- create_tasks = [
- create_entry_group,
- create_entry_gcs,
- create_tag_template,
- create_tag_template_field,
- create_tag,
- ]
- chain(*create_tasks)
-
- create_entry_group >> delete_entry_group
- create_entry_group >> create_entry_group_result
- create_entry_group >> create_entry_group_result2
-
- create_entry_gcs >> delete_entry
- create_entry_gcs >> create_entry_gcs_result
- create_entry_gcs >> create_entry_gcs_result2
-
- create_tag_template >> delete_tag_template_field
- create_tag_template >> create_tag_template_result
- create_tag_template >> create_tag_template_result2
-
- create_tag_template_field >> delete_tag_template_field
- create_tag_template_field >> create_tag_template_field_result
- create_tag_template_field >> create_tag_template_field_result2
-
- create_tag >> delete_tag
- create_tag >> create_tag_result
- create_tag >> create_tag_result2
-
- # Delete
- delete_tasks = [
- delete_tag,
- delete_tag_template_field,
- delete_tag_template,
- delete_entry,
- delete_entry_group,
- ]
- chain(*delete_tasks)
-
- # Get
- create_tag_template >> get_tag_template >> delete_tag_template
- get_tag_template >> get_tag_template_result
-
- create_entry_gcs >> get_entry >> delete_entry
- get_entry >> get_entry_result
-
- create_entry_group >> get_entry_group >> delete_entry_group
- get_entry_group >> get_entry_group_result
-
- # List
- create_tag >> list_tags >> delete_tag
- list_tags >> list_tags_result
-
- # Lookup
- create_entry_gcs >> lookup_entry_linked_resource >> delete_entry
- lookup_entry_linked_resource >> lookup_entry_result
-
- # Rename
- update_tag >> rename_tag_template_field
- create_tag_template_field >> rename_tag_template_field >> delete_tag_template_field
-
- # Search
- chain(create_tasks, search_catalog, delete_tasks)
- search_catalog >> search_catalog_result
-
- # Update
- create_entry_gcs >> update_entry >> delete_entry
- create_tag >> update_tag >> delete_tag
- create_tag_template >> update_tag_template >> delete_tag_template
- create_tag_template_field >> update_tag_template_field >> rename_tag_template_field
diff --git a/airflow/providers/google/cloud/links/datacatalog.py b/airflow/providers/google/cloud/links/datacatalog.py
new file mode 100644
index 0000000000..6df0f36d82
--- /dev/null
+++ b/airflow/providers/google/cloud/links/datacatalog.py
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Data Catalog links."""
+from typing import TYPE_CHECKING, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+DATACATALOG_BASE_LINK = "https://console.cloud.google.com/datacatalog"
+ENTRY_GROUP_LINK = (
+ DATACATALOG_BASE_LINK
+ + "/groups/{entry_group_id};container={project_id};location={location_id}?project={project_id}"
+)
+ENTRY_LINK = (
+ DATACATALOG_BASE_LINK
+ + "/projects/{project_id}/locations/{location_id}/entryGroups/{entry_group_id}/entries/{entry_id}\
+ ?project={project_id}"
+)
+TAG_TEMPLATE_LINK = (
+ DATACATALOG_BASE_LINK
+ + "/projects/{project_id}/locations/{location_id}/tagTemplates/{tag_template_id}?project={project_id}"
+)
+
+
+class DataCatalogEntryGroupLink(BaseGoogleLink):
+ """Helper class for constructing Data Catalog Entry Group Link"""
+
+ name = "Data Catalog Entry Group"
+ key = "data_catalog_entry_group"
+ format_str = ENTRY_GROUP_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance: BaseOperator,
+ entry_group_id: str,
+ location_id: str,
+ project_id: Optional[str],
+ ):
+ task_instance.xcom_push(
+ context,
+ key=DataCatalogEntryGroupLink.key,
+ value={"entry_group_id": entry_group_id, "location_id": location_id, "project_id": project_id},
+ )
+
+
+class DataCatalogEntryLink(BaseGoogleLink):
+ """Helper class for constructing Data Catalog Entry Link"""
+
+ name = "Data Catalog Entry"
+ key = "data_catalog_entry"
+ format_str = ENTRY_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance: BaseOperator,
+ entry_id: str,
+ entry_group_id: str,
+ location_id: str,
+ project_id: Optional[str],
+ ):
+ task_instance.xcom_push(
+ context,
+ key=DataCatalogEntryLink.key,
+ value={
+ "entry_id": entry_id,
+ "entry_group_id": entry_group_id,
+ "location_id": location_id,
+ "project_id": project_id,
+ },
+ )
+
+
+class DataCatalogTagTemplateLink(BaseGoogleLink):
+ """Helper class for constructing Data Catalog Tag Template Link"""
+
+ name = "Data Catalog Tag Template"
+ key = "data_catalog_tag_template"
+ format_str = TAG_TEMPLATE_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance: BaseOperator,
+ tag_template_id: str,
+ location_id: str,
+ project_id: Optional[str],
+ ):
+ task_instance.xcom_push(
+ context,
+ key=DataCatalogTagTemplateLink.key,
+ value={"tag_template_id": tag_template_id, "location_id": location_id, "project_id": project_id},
+ )
diff --git a/airflow/providers/google/cloud/operators/datacatalog.py b/airflow/providers/google/cloud/operators/datacatalog.py
index 145aeaa4c6..20bb980591 100644
--- a/airflow/providers/google/cloud/operators/datacatalog.py
+++ b/airflow/providers/google/cloud/operators/datacatalog.py
@@ -20,11 +20,12 @@ from typing import TYPE_CHECKING, Dict, Optional, Sequence, Tuple, Union
from google.api_core.exceptions import AlreadyExists, NotFound
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry
-from google.cloud.datacatalog_v1beta1 import DataCatalogClient, SearchCatalogResult
-from google.cloud.datacatalog_v1beta1.types import (
+from google.cloud.datacatalog import (
+ DataCatalogClient,
Entry,
EntryGroup,
SearchCatalogRequest,
+ SearchCatalogResult,
Tag,
TagTemplate,
TagTemplateField,
@@ -33,6 +34,11 @@ from google.protobuf.field_mask_pb2 import FieldMask
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.datacatalog import CloudDataCatalogHook
+from airflow.providers.google.cloud.links.datacatalog import (
+ DataCatalogEntryGroupLink,
+ DataCatalogEntryLink,
+ DataCatalogTagTemplateLink,
+)
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -87,6 +93,7 @@ class CloudDataCatalogCreateEntryOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogEntryLink(),)
def __init__(
self,
@@ -143,7 +150,15 @@ class CloudDataCatalogCreateEntryOperator(BaseOperator):
)
_, _, entry_id = result.name.rpartition("/")
self.log.info("Current entry_id ID: %s", entry_id)
- context["task_instance"].xcom_push(key="entry_id", value=entry_id)
+ self.xcom_push(context, key="entry_id", value=entry_id)
+ DataCatalogEntryLink.persist(
+ context=context,
+ task_instance=self,
+ entry_id=self.entry_id,
+ entry_group_id=self.entry_group,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return Entry.to_dict(result)
@@ -195,6 +210,7 @@ class CloudDataCatalogCreateEntryGroupOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogEntryGroupLink(),)
def __init__(
self,
@@ -248,7 +264,14 @@ class CloudDataCatalogCreateEntryGroupOperator(BaseOperator):
_, _, entry_group_id = result.name.rpartition("/")
self.log.info("Current entry group ID: %s", entry_group_id)
- context["task_instance"].xcom_push(key="entry_group_id", value=entry_group_id)
+ self.xcom_push(context, key="entry_group_id", value=entry_group_id)
+ DataCatalogEntryGroupLink.persist(
+ context=context,
+ task_instance=self,
+ entry_group_id=self.entry_group_id,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return EntryGroup.to_dict(result)
@@ -301,6 +324,7 @@ class CloudDataCatalogCreateTagOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogEntryLink(),)
def __init__(
self,
@@ -375,7 +399,15 @@ class CloudDataCatalogCreateTagOperator(BaseOperator):
_, _, tag_id = tag.name.rpartition("/")
self.log.info("Current Tag ID: %s", tag_id)
- context["task_instance"].xcom_push(key="tag_id", value=tag_id)
+ self.xcom_push(context, key="tag_id", value=tag_id)
+ DataCatalogEntryLink.persist(
+ context=context,
+ task_instance=self,
+ entry_id=self.entry,
+ entry_group_id=self.entry_group,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return Tag.to_dict(tag)
@@ -425,6 +457,7 @@ class CloudDataCatalogCreateTagTemplateOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogTagTemplateLink(),)
def __init__(
self,
@@ -477,7 +510,14 @@ class CloudDataCatalogCreateTagTemplateOperator(BaseOperator):
)
_, _, tag_template = result.name.rpartition("/")
self.log.info("Current Tag ID: %s", tag_template)
- context["task_instance"].xcom_push(key="tag_template_id", value=tag_template)
+ self.xcom_push(context, key="tag_template_id", value=tag_template)
+ DataCatalogTagTemplateLink.persist(
+ context=context,
+ task_instance=self,
+ tag_template_id=self.tag_template_id,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return TagTemplate.to_dict(result)
@@ -532,6 +572,7 @@ class CloudDataCatalogCreateTagTemplateFieldOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogTagTemplateLink(),)
def __init__(
self,
@@ -588,7 +629,14 @@ class CloudDataCatalogCreateTagTemplateFieldOperator(BaseOperator):
result = tag_template.fields[self.tag_template_field_id]
self.log.info("Current Tag ID: %s", self.tag_template_field_id)
- context["task_instance"].xcom_push(key="tag_template_field_id", value=self.tag_template_field_id)
+ self.xcom_push(context, key="tag_template_field_id", value=self.tag_template_field_id)
+ DataCatalogTagTemplateLink.persist(
+ context=context,
+ task_instance=self,
+ tag_template_id=self.tag_template,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return TagTemplateField.to_dict(result)
@@ -1067,6 +1115,7 @@ class CloudDataCatalogGetEntryOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogEntryLink(),)
def __init__(
self,
@@ -1106,6 +1155,14 @@ class CloudDataCatalogGetEntryOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ DataCatalogEntryLink.persist(
+ context=context,
+ task_instance=self,
+ entry_id=self.entry,
+ entry_group_id=self.entry_group,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return Entry.to_dict(result)
@@ -1153,6 +1210,7 @@ class CloudDataCatalogGetEntryGroupOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogEntryGroupLink(),)
def __init__(
self,
@@ -1192,6 +1250,13 @@ class CloudDataCatalogGetEntryGroupOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ DataCatalogEntryGroupLink.persist(
+ context=context,
+ task_instance=self,
+ entry_group_id=self.entry_group,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return EntryGroup.to_dict(result)
@@ -1234,6 +1299,7 @@ class CloudDataCatalogGetTagTemplateOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogTagTemplateLink(),)
def __init__(
self,
@@ -1270,6 +1336,13 @@ class CloudDataCatalogGetTagTemplateOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ DataCatalogTagTemplateLink.persist(
+ context=context,
+ task_instance=self,
+ tag_template_id=self.tag_template,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return TagTemplate.to_dict(result)
@@ -1319,6 +1392,7 @@ class CloudDataCatalogListTagsOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogEntryLink(),)
def __init__(
self,
@@ -1361,6 +1435,14 @@ class CloudDataCatalogListTagsOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ DataCatalogEntryLink.persist(
+ context=context,
+ task_instance=self,
+ entry_id=self.entry,
+ entry_group_id=self.entry_group,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
return [Tag.to_dict(item) for item in result]
@@ -1406,6 +1488,7 @@ class CloudDataCatalogLookupEntryOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogEntryLink(),)
def __init__(
self,
@@ -1441,6 +1524,16 @@ class CloudDataCatalogLookupEntryOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+
+ project_id, location_id, entry_group_id, entry_id = result.name.split('/')[1::2]
+ DataCatalogEntryLink.persist(
+ context=context,
+ task_instance=self,
+ entry_id=entry_id,
+ entry_group_id=entry_group_id,
+ location_id=location_id,
+ project_id=project_id,
+ )
return Entry.to_dict(result)
@@ -1489,6 +1582,7 @@ class CloudDataCatalogRenameTagTemplateFieldOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogTagTemplateLink(),)
def __init__(
self,
@@ -1531,6 +1625,13 @@ class CloudDataCatalogRenameTagTemplateFieldOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ DataCatalogTagTemplateLink.persist(
+ context=context,
+ task_instance=self,
+ tag_template_id=self.tag_template,
+ location_id=self.location,
+ project_id=self.project_id or hook.project_id,
+ )
class CloudDataCatalogSearchCatalogOperator(BaseOperator):
@@ -1695,6 +1796,7 @@ class CloudDataCatalogUpdateEntryOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogEntryLink(),)
def __init__(
self,
@@ -1729,7 +1831,7 @@ class CloudDataCatalogUpdateEntryOperator(BaseOperator):
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
- hook.update_entry(
+ result = hook.update_entry(
entry=self.entry,
update_mask=self.update_mask,
location=self.location,
@@ -1741,6 +1843,16 @@ class CloudDataCatalogUpdateEntryOperator(BaseOperator):
metadata=self.metadata,
)
+ location_id, entry_group_id, entry_id = result.name.split("/")[3::2]
+ DataCatalogEntryLink.persist(
+ context=context,
+ task_instance=self,
+ entry_id=self.entry_id or entry_id,
+ entry_group_id=self.entry_group or entry_group_id,
+ location_id=self.location or location_id,
+ project_id=self.project_id or hook.project_id,
+ )
+
class CloudDataCatalogUpdateTagOperator(BaseOperator):
"""
@@ -1795,6 +1907,7 @@ class CloudDataCatalogUpdateTagOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogEntryLink(),)
def __init__(
self,
@@ -1831,7 +1944,7 @@ class CloudDataCatalogUpdateTagOperator(BaseOperator):
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
- hook.update_tag(
+ result = hook.update_tag(
tag=self.tag,
update_mask=self.update_mask,
location=self.location,
@@ -1844,6 +1957,16 @@ class CloudDataCatalogUpdateTagOperator(BaseOperator):
metadata=self.metadata,
)
+ location_id, entry_group_id, entry_id = result.name.split("/")[3:8:2]
+ DataCatalogEntryLink.persist(
+ context=context,
+ task_instance=self,
+ entry_id=self.entry or entry_id,
+ entry_group_id=self.entry_group or entry_group_id,
+ location_id=self.location or location_id,
+ project_id=self.project_id or hook.project_id,
+ )
+
class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
"""
@@ -1900,6 +2023,7 @@ class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogTagTemplateLink(),)
def __init__(
self,
@@ -1932,7 +2056,7 @@ class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
- hook.update_tag_template(
+ result = hook.update_tag_template(
tag_template=self.tag_template,
update_mask=self.update_mask,
location=self.location,
@@ -1943,6 +2067,15 @@ class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
metadata=self.metadata,
)
+ location_id, tag_template_id = result.name.split("/")[3::2]
+ DataCatalogTagTemplateLink.persist(
+ context=context,
+ task_instance=self,
+ tag_template_id=self.tag_template_id or tag_template_id,
+ location_id=self.location or location_id,
+ project_id=self.project_id or hook.project_id,
+ )
+
class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
"""
@@ -2005,6 +2138,7 @@ class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
"gcp_conn_id",
"impersonation_chain",
)
+ operator_extra_links = (DataCatalogTagTemplateLink(),)
def __init__(
self,
@@ -2041,7 +2175,7 @@ class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
hook = CloudDataCatalogHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
- hook.update_tag_template_field(
+ result = hook.update_tag_template_field(
tag_template_field=self.tag_template_field,
update_mask=self.update_mask,
tag_template_field_name=self.tag_template_field_name,
@@ -2053,3 +2187,12 @@ class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+
+ location_id, tag_template_id = result.name.split("/")[3:6:2]
+ DataCatalogTagTemplateLink.persist(
+ context=context,
+ task_instance=self,
+ tag_template_id=self.tag_template or tag_template_id,
+ location_id=self.location or location_id,
+ project_id=self.project_id or hook.project_id,
+ )
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 225333d543..a4962fb714 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -940,6 +940,9 @@ extra-links:
- airflow.providers.google.cloud.links.bigquery_dts.BigQueryDataTransferConfigLink
- airflow.providers.google.cloud.links.cloud_tasks.CloudTasksQueueLink
- airflow.providers.google.cloud.links.cloud_tasks.CloudTasksLink
+ - airflow.providers.google.cloud.links.datacatalog.DataCatalogEntryGroupLink
+ - airflow.providers.google.cloud.links.datacatalog.DataCatalogEntryLink
+ - airflow.providers.google.cloud.links.datacatalog.DataCatalogTagTemplateLink
- airflow.providers.google.cloud.links.dataproc.DataprocLink
- airflow.providers.google.cloud.links.dataproc.DataprocListLink
- airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDetailedLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/datacatalog.rst b/docs/apache-airflow-providers-google/operators/cloud/datacatalog.rst
index 01d409714a..2c51926f68 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/datacatalog.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/datacatalog.rst
@@ -59,7 +59,7 @@ operators.
The ``CloudDataCatalogGetEntryOperator`` use Project ID, Entry Group ID, Entry ID to get the entry.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_entry]
@@ -71,7 +71,7 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_entry_result]
@@ -79,7 +79,7 @@ The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used b
The ``CloudDataCatalogLookupEntryOperator`` use the resource name to get the entry.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
@@ -91,7 +91,7 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_lookup_entry_result]
@@ -105,7 +105,7 @@ Creating an entry
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateEntryOperator`
operator create the entry.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_gcs]
@@ -117,15 +117,9 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
- :language: python
- :dedent: 4
- :start-after: [START howto_operator_gcp_datacatalog_create_entry_gcs_result2]
- :end-before: [END howto_operator_gcp_datacatalog_create_entry_gcs_result2]
-
The newly created entry ID can be read with the ``entry_id`` key.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
@@ -139,7 +133,7 @@ Updating an entry
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateEntryOperator`
operator update the entry.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_update_entry]
@@ -157,7 +151,7 @@ Deleting a entry
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteEntryOperator`
operator delete the entry.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_delete_entry]
@@ -186,7 +180,7 @@ Creating an entry group
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateEntryGroupOperator`
operator create the entry group.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_group]
@@ -198,19 +192,13 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
- :language: python
- :dedent: 4
- :start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
- :end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
-
The newly created entry group ID can be read with the ``entry_group_id`` key.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
- :start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
- :end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
+ :start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result]
+ :end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result]
.. _howto/operator:CloudDataCatalogGetEntryGroupOperator:
@@ -220,7 +208,7 @@ Getting an entry group
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogGetEntryGroupOperator`
operator get the entry group.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_entry_group]
@@ -232,7 +220,7 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_entry_group_result]
@@ -246,7 +234,7 @@ Deleting an entry group
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteEntryGroupOperator`
operator delete the entry group.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_entries.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_delete_entry_group]
@@ -258,8 +246,8 @@ parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogTagTemplateOperators:
-Managing a tag templates
-^^^^^^^^^^^^^^^^^^^^^^^^
+Managing tag templates
+^^^^^^^^^^^^^^^^^^^^^^
Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.TagTemplate` for representing a tag templates.
@@ -269,13 +257,13 @@ Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.TagTemplate` fo
.. _howto/operator:CloudDataCatalogCreateTagTemplateOperator:
-Creating a tag templates
-""""""""""""""""""""""""
+Creating a tag template
+"""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagTemplateOperator`
operator get the tag template.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag_template]
@@ -287,15 +275,9 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
- :language: python
- :dedent: 4
- :start-after: [START howto_operator_gcp_datacatalog_create_tag_template_result2]
- :end-before: [END howto_operator_gcp_datacatalog_create_tag_template_result2]
-
The newly created tag template ID can be read with the ``tag_template_id`` key.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag_template_result]
@@ -309,7 +291,7 @@ Deleting a tag template
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagTemplateOperator`
operator delete the tag template.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_delete_tag_template]
@@ -328,7 +310,7 @@ Getting a tag template
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogGetTagTemplateOperator`
operator get the tag template.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_tag_template]
@@ -340,7 +322,7 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_tag_template_result]
@@ -354,7 +336,7 @@ Updating a tag template
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagTemplateOperator`
operator update the tag template.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_update_tag_template]
@@ -366,8 +348,8 @@ parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogTagOperators:
-Managing a tags
-^^^^^^^^^^^^^^^
+Managing tags
+^^^^^^^^^^^^^
Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.Tag` for representing a tag.
@@ -383,7 +365,7 @@ Creating a tag on an entry
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagOperator`
operator get the tag template.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag]
@@ -395,33 +377,27 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
- :language: python
- :dedent: 4
- :start-after: [START howto_operator_gcp_datacatalog_create_tag_result2]
- :end-before: [END howto_operator_gcp_datacatalog_create_tag_result2]
-
The newly created tag ID can be read with the ``tag_id`` key.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
:language: python
:dedent: 4
- :start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
- :end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
+ :start-after: [START howto_operator_gcp_datacatalog_create_tag_result]
+ :end-before: [END howto_operator_gcp_datacatalog_create_tag_result]
.. _howto/operator:CloudDataCatalogUpdateTagOperator:
-Updating an tag
-"""""""""""""""
+Updating a tag
+""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagOperator`
operator update the tag template.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
:language: python
:dedent: 4
- :start-after: [START howto_operator_gcp_datacatalog_update_tag_template]
- :end-before: [END howto_operator_gcp_datacatalog_update_tag_template]
+ :start-after: [START howto_operator_gcp_datacatalog_update_tag]
+ :end-before: [END howto_operator_gcp_datacatalog_update_tag]
You can use :ref:`Jinja templating <concepts:jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagOperator`
@@ -429,17 +405,17 @@ parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogDeleteTagOperator:
-Deleting an tag
-"""""""""""""""
+Deleting a tag
+""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagOperator`
operator delete the tag template.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
:language: python
:dedent: 4
- :start-after: [START howto_operator_gcp_datacatalog_delete_tag_template]
- :end-before: [END howto_operator_gcp_datacatalog_delete_tag_template]
+ :start-after: [START howto_operator_gcp_datacatalog_delete_tag]
+ :end-before: [END howto_operator_gcp_datacatalog_delete_tag]
You can use :ref:`Jinja templating <concepts:jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagOperator`
@@ -447,13 +423,13 @@ parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogListTagsOperator:
-Listing an tags on an entry
-"""""""""""""""""""""""""""
+Listing tags on an entry
+""""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogListTagsOperator`
operator get list of the tags on the entry.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_list_tags]
@@ -465,7 +441,7 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tags.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_list_tags_result]
@@ -491,7 +467,7 @@ Creating a field
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagTemplateFieldOperator`
operator get the tag template field.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag_template_field]
@@ -503,19 +479,13 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
- :language: python
- :dedent: 4
- :start-after: [START howto_operator_gcp_datacatalog_create_tag_template_field_result2]
- :end-before: [END howto_operator_gcp_datacatalog_create_tag_template_field_result2]
-
The newly created field ID can be read with the ``tag_template_field_id`` key.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
- :start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
- :end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
+ :start-after: [START howto_operator_gcp_datacatalog_create_tag_template_field_result]
+ :end-before: [END howto_operator_gcp_datacatalog_create_tag_template_field_result]
.. _howto/operator:CloudDataCatalogRenameTagTemplateFieldOperator:
@@ -525,7 +495,7 @@ Renaming a field
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogRenameTagTemplateFieldOperator`
operator rename the tag template field.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_rename_tag_template_field]
@@ -543,7 +513,7 @@ Updating a field
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagTemplateFieldOperator`
operator get the tag template field.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_update_tag_template_field]
@@ -562,7 +532,7 @@ Deleting a field
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagTemplateFieldOperator`
operator delete the tag template field.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_delete_tag_template_field]
@@ -583,7 +553,7 @@ operator searches Data Catalog for multiple resources like entries, tags that ma
The ``query`` parameters should defined using `search syntax <https://cloud.google.com/data-catalog/docs/how-to/search-reference>`__.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_search_catalog]
@@ -595,7 +565,7 @@ parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
+.. exampleinclude:: /../../tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_search_catalog_result]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 44362b9b93..df08668eec 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1135,6 +1135,7 @@ opsgenie
optimise
optionality
ora
+oracledb
orchestrator
orgtbl
orm
diff --git a/tests/providers/google/cloud/operators/test_datacatalog.py b/tests/providers/google/cloud/operators/test_datacatalog.py
index ff6f14a10f..e35024f5a1 100644
--- a/tests/providers/google/cloud/operators/test_datacatalog.py
+++ b/tests/providers/google/cloud/operators/test_datacatalog.py
@@ -21,7 +21,7 @@ from unittest import TestCase, mock
from google.api_core.exceptions import AlreadyExists
from google.api_core.gapic_v1.method import _MethodDefault
from google.api_core.retry import Retry
-from google.cloud.datacatalog_v1beta1.types import Entry, EntryGroup, Tag, TagTemplate, TagTemplateField
+from google.cloud.datacatalog import Entry, EntryGroup, Tag, TagTemplate, TagTemplateField
from google.protobuf.field_mask_pb2 import FieldMask
from airflow.providers.google.cloud.operators.datacatalog import (
@@ -47,8 +47,8 @@ from airflow.providers.google.cloud.operators.datacatalog import (
CloudDataCatalogUpdateTagTemplateFieldOperator,
CloudDataCatalogUpdateTagTemplateOperator,
)
-from airflow.utils.context import Context
+BASE_PATH = "airflow.providers.google.cloud.operators.datacatalog.{}"
TEST_PROJECT_ID: str = "example_id"
TEST_LOCATION: str = "en-west-3"
TEST_ENTRY_ID: str = "test-entry-id"
@@ -94,6 +94,8 @@ TEST_ENTRY_DICT: Dict = {
'description': '',
'display_name': '',
'linked_resource': '',
+ 'fully_qualified_name': '',
+ 'labels': {},
'name': TEST_ENTRY_PATH,
}
TEST_ENTRY_GROUP: EntryGroup = EntryGroup(name=TEST_ENTRY_GROUP_PATH)
@@ -101,14 +103,24 @@ TEST_ENTRY_GROUP_DICT: Dict = {'description': '', 'display_name': '', 'name': TE
TEST_TAG: Tag = Tag(name=TEST_TAG_PATH)
TEST_TAG_DICT: Dict = {'fields': {}, 'name': TEST_TAG_PATH, 'template': '', 'template_display_name': ''}
TEST_TAG_TEMPLATE: TagTemplate = TagTemplate(name=TEST_TAG_TEMPLATE_PATH)
-TEST_TAG_TEMPLATE_DICT: Dict = {'display_name': '', 'fields': {}, 'name': TEST_TAG_TEMPLATE_PATH}
+TEST_TAG_TEMPLATE_DICT: Dict = {
+ 'display_name': '',
+ 'fields': {},
+ 'is_publicly_readable': False,
+ 'name': TEST_TAG_TEMPLATE_PATH,
+}
TEST_TAG_TEMPLATE_FIELD: TagTemplateField = TagTemplateField(name=TEST_TAG_TEMPLATE_FIELD_ID)
TEST_TAG_TEMPLATE_FIELD_DICT: Dict = {
+ 'description': '',
'display_name': '',
'is_required': False,
'name': TEST_TAG_TEMPLATE_FIELD_ID,
'order': 0,
}
+TEST_ENTRY_LINK = "projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}/entries/{entry_id}"
+TEST_TAG_TEMPLATE_LINK = "projects/{project_id}/locations/{location}/tagTemplates/{tag_template_id}"
+TEST_TAG_TEMPLATE_FIELD_LINK = "projects/{project_id}/locations/{location}/tagTemplates/{tag_template_id}\
+ /fields/{tag_template_field_id}"
class TestCloudDataCatalogCreateEntryOperator(TestCase):
@@ -116,7 +128,8 @@ class TestCloudDataCatalogCreateEntryOperator(TestCase):
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.create_entry.return_value": TEST_ENTRY},
)
- def test_assert_valid_hook_call(self, mock_hook) -> None:
+ @mock.patch(BASE_PATH.format("CloudDataCatalogCreateEntryOperator.xcom_push"))
+ def test_assert_valid_hook_call(self, mock_xcom, mock_hook) -> None:
task = CloudDataCatalogCreateEntryOperator(
task_id="task_id",
location=TEST_LOCATION,
@@ -130,8 +143,8 @@ class TestCloudDataCatalogCreateEntryOperator(TestCase):
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
- ti = mock.MagicMock()
- result = task.execute(context=Context(task_instance=ti))
+ context = mock.MagicMock()
+ result = task.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -146,11 +159,21 @@ class TestCloudDataCatalogCreateEntryOperator(TestCase):
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
- ti.xcom_push.assert_called_once_with(key="entry_id", value=TEST_ENTRY_ID)
+ mock_xcom.assert_called_with(
+ context,
+ key="data_catalog_entry",
+ value={
+ "entry_id": TEST_ENTRY_ID,
+ "entry_group_id": TEST_ENTRY_GROUP_ID,
+ "location_id": TEST_LOCATION,
+ "project_id": TEST_PROJECT_ID,
+ },
+ )
assert TEST_ENTRY_DICT == result
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
- def test_assert_valid_hook_call_when_exists(self, mock_hook) -> None:
+ @mock.patch(BASE_PATH.format("CloudDataCatalogCreateEntryOperator.xcom_push"))
+ def test_assert_valid_hook_call_when_exists(self, mock_xcom, mock_hook) -> None:
mock_hook.return_value.create_entry.side_effect = AlreadyExists(message="message")
mock_hook.return_value.get_entry.return_value = TEST_ENTRY
task = CloudDataCatalogCreateEntryOperator(
@@ -166,8 +189,8 @@ class TestCloudDataCatalogCreateEntryOperator(TestCase):
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
- ti = mock.MagicMock()
- result = task.execute(context=Context(task_instance=ti))
+ context = mock.MagicMock()
+ result = task.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -191,7 +214,16 @@ class TestCloudDataCatalogCreateEntryOperator(TestCase):
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
- ti.xcom_push.assert_called_once_with(key="entry_id", value=TEST_ENTRY_ID)
+ mock_xcom.assert_called_with(
+ context,
+ key="data_catalog_entry",
+ value={
+ "entry_id": TEST_ENTRY_ID,
+ "entry_group_id": TEST_ENTRY_GROUP_ID,
+ "location_id": TEST_LOCATION,
+ "project_id": TEST_PROJECT_ID,
+ },
+ )
assert TEST_ENTRY_DICT == result
@@ -200,7 +232,8 @@ class TestCloudDataCatalogCreateEntryGroupOperator(TestCase):
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.create_entry_group.return_value": TEST_ENTRY_GROUP},
)
- def test_assert_valid_hook_call(self, mock_hook) -> None:
+ @mock.patch(BASE_PATH.format("CloudDataCatalogCreateEntryGroupOperator.xcom_push"))
+ def test_assert_valid_hook_call(self, mock_xcom, mock_hook) -> None:
task = CloudDataCatalogCreateEntryGroupOperator(
task_id="task_id",
location=TEST_LOCATION,
@@ -213,8 +246,8 @@ class TestCloudDataCatalogCreateEntryGroupOperator(TestCase):
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
- ti = mock.MagicMock()
- result = task.execute(context=Context(task_instance=ti))
+ context = mock.MagicMock()
+ result = task.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -228,7 +261,15 @@ class TestCloudDataCatalogCreateEntryGroupOperator(TestCase):
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
- ti.xcom_push.assert_called_once_with(key="entry_group_id", value=TEST_ENTRY_GROUP_ID)
+ mock_xcom.assert_called_with(
+ context,
+ key="data_catalog_entry_group",
+ value={
+ "entry_group_id": TEST_ENTRY_GROUP_ID,
+ "location_id": TEST_LOCATION,
+ "project_id": TEST_PROJECT_ID,
+ },
+ )
assert result == TEST_ENTRY_GROUP_DICT
@@ -237,7 +278,8 @@ class TestCloudDataCatalogCreateTagOperator(TestCase):
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.create_tag.return_value": TEST_TAG},
)
- def test_assert_valid_hook_call(self, mock_hook) -> None:
+ @mock.patch(BASE_PATH.format("CloudDataCatalogCreateTagOperator.xcom_push"))
+ def test_assert_valid_hook_call(self, mock_xcom, mock_hook) -> None:
task = CloudDataCatalogCreateTagOperator(
task_id="task_id",
location=TEST_LOCATION,
@@ -252,8 +294,8 @@ class TestCloudDataCatalogCreateTagOperator(TestCase):
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
- ti = mock.MagicMock()
- result = task.execute(context=Context(task_instance=ti))
+ context = mock.MagicMock()
+ result = task.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -269,7 +311,16 @@ class TestCloudDataCatalogCreateTagOperator(TestCase):
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
- ti.xcom_push.assert_called_once_with(key="tag_id", value=TEST_TAG_ID)
+ mock_xcom.assert_called_with(
+ context,
+ key="data_catalog_entry",
+ value={
+ "entry_id": TEST_ENTRY_ID,
+ "entry_group_id": TEST_ENTRY_GROUP_ID,
+ "location_id": TEST_LOCATION,
+ "project_id": TEST_PROJECT_ID,
+ },
+ )
assert TEST_TAG_DICT == result
@@ -278,7 +329,8 @@ class TestCloudDataCatalogCreateTagTemplateOperator(TestCase):
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.create_tag_template.return_value": TEST_TAG_TEMPLATE},
)
- def test_assert_valid_hook_call(self, mock_hook) -> None:
+ @mock.patch(BASE_PATH.format("CloudDataCatalogCreateTagTemplateOperator.xcom_push"))
+ def test_assert_valid_hook_call(self, mock_xcom, mock_hook) -> None:
task = CloudDataCatalogCreateTagTemplateOperator(
task_id="task_id",
location=TEST_LOCATION,
@@ -291,8 +343,8 @@ class TestCloudDataCatalogCreateTagTemplateOperator(TestCase):
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
- ti = mock.MagicMock()
- result = task.execute(context=Context(task_instance=ti))
+ context = mock.MagicMock()
+ result = task.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -306,7 +358,15 @@ class TestCloudDataCatalogCreateTagTemplateOperator(TestCase):
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
- ti.xcom_push.assert_called_once_with(key="tag_template_id", value=TEST_TAG_TEMPLATE_ID)
+ mock_xcom.assert_called_with(
+ context,
+ key="data_catalog_tag_template",
+ value={
+ "tag_template_id": TEST_TAG_TEMPLATE_ID,
+ "location_id": TEST_LOCATION,
+ "project_id": TEST_PROJECT_ID,
+ },
+ )
assert TEST_TAG_TEMPLATE_DICT == result
@@ -315,7 +375,8 @@ class TestCloudDataCatalogCreateTagTemplateFieldOperator(TestCase):
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.create_tag_template_field.return_value": TEST_TAG_TEMPLATE_FIELD}, # type: ignore
)
- def test_assert_valid_hook_call(self, mock_hook) -> None:
+ @mock.patch(BASE_PATH.format("CloudDataCatalogCreateTagTemplateFieldOperator.xcom_push"))
+ def test_assert_valid_hook_call(self, mock_xcom, mock_hook) -> None:
task = CloudDataCatalogCreateTagTemplateFieldOperator(
task_id="task_id",
location=TEST_LOCATION,
@@ -329,8 +390,8 @@ class TestCloudDataCatalogCreateTagTemplateFieldOperator(TestCase):
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
)
- ti = mock.MagicMock()
- result = task.execute(context=Context(task_instance=ti))
+ context = mock.MagicMock()
+ result = task.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=TEST_GCP_CONN_ID,
impersonation_chain=TEST_IMPERSONATION_CHAIN,
@@ -345,7 +406,15 @@ class TestCloudDataCatalogCreateTagTemplateFieldOperator(TestCase):
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
- ti.xcom_push.assert_called_once_with(key="tag_template_field_id", value=TEST_TAG_TEMPLATE_FIELD_ID)
+ mock_xcom.assert_called_with(
+ context,
+ key="data_catalog_tag_template",
+ value={
+ "tag_template_id": TEST_TAG_TEMPLATE_ID,
+ "location_id": TEST_LOCATION,
+ "project_id": TEST_PROJECT_ID,
+ },
+ )
assert TEST_TAG_TEMPLATE_FIELD_DICT == result
@@ -739,6 +808,12 @@ class TestCloudDataCatalogSearchCatalogOperator(TestCase):
class TestCloudDataCatalogUpdateEntryOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
+ mock_hook.return_value.update_entry.return_value.name = TEST_ENTRY_LINK.format(
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ entry_group_id=TEST_ENTRY_GROUP_ID,
+ entry_id=TEST_ENTRY_ID,
+ )
task = CloudDataCatalogUpdateEntryOperator(
task_id="task_id",
entry=TEST_ENTRY,
@@ -774,6 +849,12 @@ class TestCloudDataCatalogUpdateEntryOperator(TestCase):
class TestCloudDataCatalogUpdateTagOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
+ mock_hook.return_value.update_tag.return_value.name = TEST_ENTRY_LINK.format(
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ entry_group_id=TEST_ENTRY_GROUP_ID,
+ entry_id=TEST_ENTRY_ID,
+ )
task = CloudDataCatalogUpdateTagOperator(
task_id="task_id",
tag=Tag(name=TEST_TAG_ID),
@@ -811,6 +892,11 @@ class TestCloudDataCatalogUpdateTagOperator(TestCase):
class TestCloudDataCatalogUpdateTagTemplateOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
+ mock_hook.return_value.update_tag_template.return_value.name = TEST_TAG_TEMPLATE_LINK.format(
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ tag_template_id=TEST_TAG_TEMPLATE_ID,
+ )
task = CloudDataCatalogUpdateTagTemplateOperator(
task_id="task_id",
tag_template=TagTemplate(name=TEST_TAG_TEMPLATE_ID),
@@ -844,6 +930,14 @@ class TestCloudDataCatalogUpdateTagTemplateOperator(TestCase):
class TestCloudDataCatalogUpdateTagTemplateFieldOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
+ mock_hook.return_value.update_tag_template_field.return_value.name = (
+ TEST_TAG_TEMPLATE_FIELD_LINK.format(
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ tag_template_id=TEST_TAG_TEMPLATE_ID,
+ tag_template_field_id=TEST_TAG_TEMPLATE_FIELD_ID,
+ )
+ )
task = CloudDataCatalogUpdateTagTemplateFieldOperator(
task_id="task_id",
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
diff --git a/tests/providers/google/cloud/operators/test_datacatalog_system.py b/tests/system/providers/google/datacatalog/__init__.py
similarity index 57%
rename from tests/providers/google/cloud/operators/test_datacatalog_system.py
rename to tests/system/providers/google/datacatalog/__init__.py
index 00e1ce0b8e..13a83393a9 100644
--- a/tests/providers/google/cloud/operators/test_datacatalog_system.py
+++ b/tests/system/providers/google/datacatalog/__init__.py
@@ -1,4 +1,3 @@
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -15,20 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-import pytest
-
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_DATACATALOG_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-
-@pytest.mark.credential_file(GCP_DATACATALOG_KEY)
-class CloudDataCatalogExampleDagsSystemTest(GoogleSystemTest):
- def setUp(self):
- super().setUp()
-
- @provide_gcp_context(GCP_DATACATALOG_KEY)
- def test_run_example_gcp_dataflow_native_java(self):
- self.run_dag('example_gcp_datacatalog', CLOUD_DAG_FOLDER)
-
- def tearDown(self):
- super().tearDown()
diff --git a/tests/system/providers/google/datacatalog/example_datacatalog_entries.py b/tests/system/providers/google/datacatalog/example_datacatalog_entries.py
new file mode 100644
index 0000000000..cc7430452b
--- /dev/null
+++ b/tests/system/providers/google/datacatalog/example_datacatalog_entries.py
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.datacatalog import (
+ CloudDataCatalogCreateEntryGroupOperator,
+ CloudDataCatalogCreateEntryOperator,
+ CloudDataCatalogDeleteEntryGroupOperator,
+ CloudDataCatalogDeleteEntryOperator,
+ CloudDataCatalogGetEntryGroupOperator,
+ CloudDataCatalogGetEntryOperator,
+ CloudDataCatalogLookupEntryOperator,
+ CloudDataCatalogUpdateEntryOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datacatalog_entries"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+LOCATION = "us-central1"
+ENTRY_GROUP_ID = f"id_{DAG_ID}_{ENV_ID}"
+ENTRY_GROUP_NAME = f"name {DAG_ID} {ENV_ID}"
+ENTRY_ID = "python_files"
+ENTRY_NAME = "Wizard"
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+) as dag:
+ create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
+
+ # Create
+ # [START howto_operator_gcp_datacatalog_create_entry_group]
+ create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
+ task_id="create_entry_group",
+ location=LOCATION,
+ entry_group_id=ENTRY_GROUP_ID,
+ entry_group={"display_name": ENTRY_GROUP_NAME},
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_group]
+
+ # [START howto_operator_gcp_datacatalog_create_entry_group_result]
+ create_entry_group_result = BashOperator(
+ task_id="create_entry_group_result",
+ bash_command=f"echo {create_entry_group.output['entry_group_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_group_result]
+
+ # [START howto_operator_gcp_datacatalog_create_entry_gcs]
+ create_entry_gcs = CloudDataCatalogCreateEntryOperator(
+ task_id="create_entry_gcs",
+ location=LOCATION,
+ entry_group=ENTRY_GROUP_ID,
+ entry_id=ENTRY_ID,
+ entry={
+ "display_name": ENTRY_NAME,
+ "type_": "FILESET",
+ "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
+ },
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_gcs]
+
+ # [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
+ create_entry_gcs_result = BashOperator(
+ task_id="create_entry_gcs_result",
+ bash_command=f"echo {create_entry_gcs.output['entry_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_gcs_result]
+
+ # Get
+ # [START howto_operator_gcp_datacatalog_get_entry_group]
+ get_entry_group = CloudDataCatalogGetEntryGroupOperator(
+ task_id="get_entry_group",
+ location=LOCATION,
+ entry_group=ENTRY_GROUP_ID,
+ read_mask=FieldMask(paths=["name", "display_name"]),
+ )
+ # [END howto_operator_gcp_datacatalog_get_entry_group]
+
+ # [START howto_operator_gcp_datacatalog_get_entry_group_result]
+ get_entry_group_result = BashOperator(
+ task_id="get_entry_group_result",
+ bash_command=f"echo {get_entry_group.output}",
+ )
+ # [END howto_operator_gcp_datacatalog_get_entry_group_result]
+
+ # [START howto_operator_gcp_datacatalog_get_entry]
+ get_entry = CloudDataCatalogGetEntryOperator(
+ task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+ )
+ # [END howto_operator_gcp_datacatalog_get_entry]
+
+ # [START howto_operator_gcp_datacatalog_get_entry_result]
+ get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
+ # [END howto_operator_gcp_datacatalog_get_entry_result]
+
+ # Lookup
+ # [START howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
+ current_entry_template = (
+ "//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
+ "entryGroups/{entry_group}/entries/{entry}"
+ )
+ lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
+ task_id="lookup_entry",
+ linked_resource=current_entry_template.format(
+ project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+ ),
+ )
+ # [END howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
+
+ # [START howto_operator_gcp_datacatalog_lookup_entry_result]
+ lookup_entry_result = BashOperator(
+ task_id="lookup_entry_result",
+ bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
+ )
+ # [END howto_operator_gcp_datacatalog_lookup_entry_result]
+
+ # Update
+ # [START howto_operator_gcp_datacatalog_update_entry]
+ update_entry = CloudDataCatalogUpdateEntryOperator(
+ task_id="update_entry",
+ entry={"display_name": f"{ENTRY_NAME} UPDATED"},
+ update_mask={"paths": ["display_name"]},
+ location=LOCATION,
+ entry_group=ENTRY_GROUP_ID,
+ entry_id=ENTRY_ID,
+ )
+ # [END howto_operator_gcp_datacatalog_update_entry]
+
+ # Delete
+ # [START howto_operator_gcp_datacatalog_delete_entry]
+ delete_entry = CloudDataCatalogDeleteEntryOperator(
+ task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+ )
+ # [END howto_operator_gcp_datacatalog_delete_entry]
+ delete_entry.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_operator_gcp_datacatalog_delete_entry_group]
+ delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
+ task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
+ )
+ # [END howto_operator_gcp_datacatalog_delete_entry_group]
+ delete_entry_group.trigger_rule = TriggerRule.ALL_DONE
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ # TEST BODY
+ >> create_entry_group
+ >> create_entry_group_result
+ >> get_entry_group
+ >> get_entry_group_result
+ >> create_entry_gcs
+ >> create_entry_gcs_result
+ >> get_entry
+ >> get_entry_result
+ >> lookup_entry_linked_resource
+ >> lookup_entry_result
+ >> update_entry
+ >> delete_entry
+ >> delete_entry_group
+ # TEST TEARDOWN
+ >> delete_bucket
+ )
+
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py b/tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py
new file mode 100644
index 0000000000..e12ee63c9b
--- /dev/null
+++ b/tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from google.cloud.datacatalog import TagField, TagTemplateField
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.datacatalog import (
+ CloudDataCatalogCreateEntryGroupOperator,
+ CloudDataCatalogCreateEntryOperator,
+ CloudDataCatalogCreateTagOperator,
+ CloudDataCatalogCreateTagTemplateOperator,
+ CloudDataCatalogDeleteEntryGroupOperator,
+ CloudDataCatalogDeleteEntryOperator,
+ CloudDataCatalogDeleteTagOperator,
+ CloudDataCatalogDeleteTagTemplateOperator,
+ CloudDataCatalogSearchCatalogOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datacatalog_search_catalog"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+LOCATION = "us-central1"
+ENTRY_GROUP_ID = f"id_{DAG_ID}_{ENV_ID}"
+ENTRY_GROUP_NAME = f"name {DAG_ID} {ENV_ID}"
+ENTRY_ID = "python_files"
+ENTRY_NAME = "Wizard"
+TEMPLATE_ID = "template_id"
+TAG_TEMPLATE_DISPLAY_NAME = f"Data Catalog {DAG_ID} {ENV_ID}"
+FIELD_NAME_1 = "first"
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+) as dag:
+ create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
+
+ # Create
+ # [START howto_operator_gcp_datacatalog_create_entry_group]
+ create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
+ task_id="create_entry_group",
+ location=LOCATION,
+ entry_group_id=ENTRY_GROUP_ID,
+ entry_group={"display_name": ENTRY_GROUP_NAME},
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_group]
+
+ # [START howto_operator_gcp_datacatalog_create_entry_group_result]
+ create_entry_group_result = BashOperator(
+ task_id="create_entry_group_result",
+ bash_command=f"echo {create_entry_group.output['entry_group_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_group_result]
+
+ # [START howto_operator_gcp_datacatalog_create_entry_gcs]
+ create_entry_gcs = CloudDataCatalogCreateEntryOperator(
+ task_id="create_entry_gcs",
+ location=LOCATION,
+ entry_group=ENTRY_GROUP_ID,
+ entry_id=ENTRY_ID,
+ entry={
+ "display_name": ENTRY_NAME,
+ "type_": "FILESET",
+ "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
+ },
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_gcs]
+
+ # [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
+ create_entry_gcs_result = BashOperator(
+ task_id="create_entry_gcs_result",
+ bash_command=f"echo {create_entry_gcs.output['entry_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_gcs_result]
+
+ # [START howto_operator_gcp_datacatalog_create_tag]
+ create_tag = CloudDataCatalogCreateTagOperator(
+ task_id="create_tag",
+ location=LOCATION,
+ entry_group=ENTRY_GROUP_ID,
+ entry=ENTRY_ID,
+ template_id=TEMPLATE_ID,
+ tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag]
+
+ # [START howto_operator_gcp_datacatalog_create_tag_result]
+ create_tag_result = BashOperator(
+ task_id="create_tag_result",
+ bash_command=f"echo {create_tag.output['tag_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag_result]
+
+ # [START howto_operator_gcp_datacatalog_create_tag_template]
+ create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
+ task_id="create_tag_template",
+ location=LOCATION,
+ tag_template_id=TEMPLATE_ID,
+ tag_template={
+ "display_name": TAG_TEMPLATE_DISPLAY_NAME,
+ "fields": {
+ FIELD_NAME_1: TagTemplateField(
+ display_name="first-field", type_=dict(primitive_type="STRING")
+ )
+ },
+ },
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag_template]
+
+ # [START howto_operator_gcp_datacatalog_create_tag_template_result]
+ create_tag_template_result = BashOperator(
+ task_id="create_tag_template_result",
+ bash_command=f"echo {create_tag_template.output['tag_template_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag_template_result]
+
+ # Search
+ # [START howto_operator_gcp_datacatalog_search_catalog]
+ search_catalog = CloudDataCatalogSearchCatalogOperator(
+ task_id="search_catalog", scope={"include_project_ids": [PROJECT_ID]}, query=f"projectid:{PROJECT_ID}"
+ )
+ # [END howto_operator_gcp_datacatalog_search_catalog]
+
+ # [START howto_operator_gcp_datacatalog_search_catalog_result]
+ search_catalog_result = BashOperator(
+ task_id="search_catalog_result",
+ bash_command=f"echo {search_catalog.output}",
+ )
+ # [END howto_operator_gcp_datacatalog_search_catalog_result]
+
+ # Delete
+ # [START howto_operator_gcp_datacatalog_delete_entry]
+ delete_entry = CloudDataCatalogDeleteEntryOperator(
+ task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+ )
+ # [END howto_operator_gcp_datacatalog_delete_entry]
+ delete_entry.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_operator_gcp_datacatalog_delete_entry_group]
+ delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
+ task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
+ )
+ # [END howto_operator_gcp_datacatalog_delete_entry_group]
+ delete_entry_group.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_operator_gcp_datacatalog_delete_tag]
+ delete_tag = CloudDataCatalogDeleteTagOperator(
+ task_id="delete_tag",
+ location=LOCATION,
+ entry_group=ENTRY_GROUP_ID,
+ entry=ENTRY_ID,
+ tag=create_tag.output["tag_id"],
+ )
+ # [END howto_operator_gcp_datacatalog_delete_tag]
+ delete_tag.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_operator_gcp_datacatalog_delete_tag_template]
+ delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
+ task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
+ )
+ # [END howto_operator_gcp_datacatalog_delete_tag_template]
+ delete_tag_template.trigger_rule = TriggerRule.ALL_DONE
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ # TEST BODY
+ >> create_entry_group
+ >> create_entry_group_result
+ >> create_entry_gcs
+ >> create_entry_gcs_result
+ >> create_tag_template
+ >> create_tag_template_result
+ >> create_tag
+ >> create_tag_result
+ >> search_catalog
+ >> search_catalog_result
+ >> delete_tag
+ >> delete_tag_template
+ >> delete_entry
+ >> delete_entry_group
+ # TEST TEARDOWN
+ >> delete_bucket
+ )
+
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py b/tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
new file mode 100644
index 0000000000..291af290da
--- /dev/null
+++ b/tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py
@@ -0,0 +1,192 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from google.cloud.datacatalog import FieldType, TagTemplateField
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.datacatalog import (
+ CloudDataCatalogCreateTagTemplateFieldOperator,
+ CloudDataCatalogCreateTagTemplateOperator,
+ CloudDataCatalogDeleteTagTemplateFieldOperator,
+ CloudDataCatalogDeleteTagTemplateOperator,
+ CloudDataCatalogGetTagTemplateOperator,
+ CloudDataCatalogRenameTagTemplateFieldOperator,
+ CloudDataCatalogUpdateTagTemplateFieldOperator,
+ CloudDataCatalogUpdateTagTemplateOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datacatalog_tag_templates"
+
+LOCATION = "us-central1"
+TEMPLATE_ID = "template_id"
+TAG_TEMPLATE_DISPLAY_NAME = f"Data Catalog {DAG_ID} {ENV_ID}"
+FIELD_NAME_1 = "first"
+FIELD_NAME_2 = "second"
+FIELD_NAME_3 = "first-rename"
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+) as dag:
+ # Create
+ # [START howto_operator_gcp_datacatalog_create_tag_template]
+ create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
+ task_id="create_tag_template",
+ location=LOCATION,
+ tag_template_id=TEMPLATE_ID,
+ tag_template={
+ "display_name": TAG_TEMPLATE_DISPLAY_NAME,
+ "fields": {
+ FIELD_NAME_1: TagTemplateField(
+ display_name="first-field", type_=dict(primitive_type="STRING")
+ )
+ },
+ },
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag_template]
+
+ # [START howto_operator_gcp_datacatalog_create_tag_template_result]
+ create_tag_template_result = BashOperator(
+ task_id="create_tag_template_result",
+ bash_command=f"echo {create_tag_template.output['tag_template_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag_template_result]
+
+ # [START howto_operator_gcp_datacatalog_create_tag_template_field]
+ create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
+ task_id="create_tag_template_field",
+ location=LOCATION,
+ tag_template=TEMPLATE_ID,
+ tag_template_field_id=FIELD_NAME_2,
+ tag_template_field=TagTemplateField(
+ display_name="second-field", type_=FieldType(primitive_type="STRING")
+ ),
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag_template_field]
+
+ # [START howto_operator_gcp_datacatalog_create_tag_template_field_result]
+ create_tag_template_field_result = BashOperator(
+ task_id="create_tag_template_field_result",
+ bash_command=f"echo {create_tag_template_field.output['tag_template_field_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag_template_field_result]
+
+ # Get
+ # [START howto_operator_gcp_datacatalog_get_tag_template]
+ get_tag_template = CloudDataCatalogGetTagTemplateOperator(
+ task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
+ )
+ # [END howto_operator_gcp_datacatalog_get_tag_template]
+
+ # [START howto_operator_gcp_datacatalog_get_tag_template_result]
+ get_tag_template_result = BashOperator(
+ task_id="get_tag_template_result",
+ bash_command=f"echo {get_tag_template.output}",
+ )
+ # [END howto_operator_gcp_datacatalog_get_tag_template_result]
+
+ # Rename
+ # [START howto_operator_gcp_datacatalog_rename_tag_template_field]
+ rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
+ task_id="rename_tag_template_field",
+ location=LOCATION,
+ tag_template=TEMPLATE_ID,
+ field=FIELD_NAME_1,
+ new_tag_template_field_id=FIELD_NAME_3,
+ )
+ # [END howto_operator_gcp_datacatalog_rename_tag_template_field]
+
+ # Update
+ # [START howto_operator_gcp_datacatalog_update_tag_template]
+ update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
+ task_id="update_tag_template",
+ tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
+ update_mask={"paths": ["display_name"]},
+ location=LOCATION,
+ tag_template_id=TEMPLATE_ID,
+ )
+ # [END howto_operator_gcp_datacatalog_update_tag_template]
+
+ # [START howto_operator_gcp_datacatalog_update_tag_template_field]
+ update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
+ task_id="update_tag_template_field",
+ tag_template_field={"display_name": "Updated template field"},
+ update_mask={"paths": ["display_name"]},
+ location=LOCATION,
+ tag_template=TEMPLATE_ID,
+ tag_template_field_id=FIELD_NAME_1,
+ )
+ # [END howto_operator_gcp_datacatalog_update_tag_template_field]
+
+ # Delete
+ # [START howto_operator_gcp_datacatalog_delete_tag_template_field]
+ delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
+ task_id="delete_tag_template_field",
+ location=LOCATION,
+ tag_template=TEMPLATE_ID,
+ field=FIELD_NAME_2,
+ force=True,
+ )
+ # [END howto_operator_gcp_datacatalog_delete_tag_template_field]
+ delete_tag_template_field.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_operator_gcp_datacatalog_delete_tag_template]
+ delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
+ task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
+ )
+ # [END howto_operator_gcp_datacatalog_delete_tag_template]
+ delete_tag_template.trigger_rule = TriggerRule.ALL_DONE
+
+ (
+ # TEST BODY
+ create_tag_template
+ >> create_tag_template_result
+ >> create_tag_template_field
+ >> create_tag_template_field_result
+ >> get_tag_template
+ >> get_tag_template_result
+ >> update_tag_template
+ >> update_tag_template_field
+ >> rename_tag_template_field
+ >> delete_tag_template_field
+ >> delete_tag_template
+ )
+
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/datacatalog/example_datacatalog_tags.py b/tests/system/providers/google/datacatalog/example_datacatalog_tags.py
new file mode 100644
index 0000000000..20eeb3895a
--- /dev/null
+++ b/tests/system/providers/google/datacatalog/example_datacatalog_tags.py
@@ -0,0 +1,239 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from google.cloud.datacatalog import TagField, TagTemplateField
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.google.cloud.operators.datacatalog import (
+ CloudDataCatalogCreateEntryGroupOperator,
+ CloudDataCatalogCreateEntryOperator,
+ CloudDataCatalogCreateTagOperator,
+ CloudDataCatalogCreateTagTemplateOperator,
+ CloudDataCatalogDeleteEntryGroupOperator,
+ CloudDataCatalogDeleteEntryOperator,
+ CloudDataCatalogDeleteTagOperator,
+ CloudDataCatalogDeleteTagTemplateOperator,
+ CloudDataCatalogListTagsOperator,
+ CloudDataCatalogUpdateTagOperator,
+)
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "datacatalog_tags"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+LOCATION = "us-central1"
+ENTRY_GROUP_ID = f"id_{DAG_ID}_{ENV_ID}"
+ENTRY_GROUP_NAME = f"name {DAG_ID} {ENV_ID}"
+ENTRY_ID = "python_files"
+ENTRY_NAME = "Wizard"
+TEMPLATE_ID = "template_id"
+TAG_TEMPLATE_DISPLAY_NAME = f"Data Catalog {DAG_ID} {ENV_ID}"
+FIELD_NAME_1 = "first"
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+) as dag:
+ create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
+
+ # Create
+ # [START howto_operator_gcp_datacatalog_create_entry_group]
+ create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
+ task_id="create_entry_group",
+ location=LOCATION,
+ entry_group_id=ENTRY_GROUP_ID,
+ entry_group={"display_name": ENTRY_GROUP_NAME},
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_group]
+
+ # [START howto_operator_gcp_datacatalog_create_entry_group_result]
+ create_entry_group_result = BashOperator(
+ task_id="create_entry_group_result",
+ bash_command=f"echo {create_entry_group.output['entry_group_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_group_result]
+
+ # [START howto_operator_gcp_datacatalog_create_entry_gcs]
+ create_entry_gcs = CloudDataCatalogCreateEntryOperator(
+ task_id="create_entry_gcs",
+ location=LOCATION,
+ entry_group=ENTRY_GROUP_ID,
+ entry_id=ENTRY_ID,
+ entry={
+ "display_name": ENTRY_NAME,
+ "type_": "FILESET",
+ "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
+ },
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_gcs]
+
+ # [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
+ create_entry_gcs_result = BashOperator(
+ task_id="create_entry_gcs_result",
+ bash_command=f"echo {create_entry_gcs.output['entry_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_entry_gcs_result]
+
+ # [START howto_operator_gcp_datacatalog_create_tag]
+ create_tag = CloudDataCatalogCreateTagOperator(
+ task_id="create_tag",
+ location=LOCATION,
+ entry_group=ENTRY_GROUP_ID,
+ entry=ENTRY_ID,
+ template_id=TEMPLATE_ID,
+ tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag]
+
+ # [START howto_operator_gcp_datacatalog_create_tag_result]
+ create_tag_result = BashOperator(
+ task_id="create_tag_result",
+ bash_command=f"echo {create_tag.output['tag_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag_result]
+
+ # [START howto_operator_gcp_datacatalog_create_tag_template]
+ create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
+ task_id="create_tag_template",
+ location=LOCATION,
+ tag_template_id=TEMPLATE_ID,
+ tag_template={
+ "display_name": TAG_TEMPLATE_DISPLAY_NAME,
+ "fields": {
+ FIELD_NAME_1: TagTemplateField(
+ display_name="first-field", type_=dict(primitive_type="STRING")
+ )
+ },
+ },
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag_template]
+
+ # [START howto_operator_gcp_datacatalog_create_tag_template_result]
+ create_tag_template_result = BashOperator(
+ task_id="create_tag_template_result",
+ bash_command=f"echo {create_tag_template.output['tag_template_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_create_tag_template_result]
+
+ # List
+ # [START howto_operator_gcp_datacatalog_list_tags]
+ list_tags = CloudDataCatalogListTagsOperator(
+ task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+ )
+ # [END howto_operator_gcp_datacatalog_list_tags]
+
+ # [START howto_operator_gcp_datacatalog_list_tags_result]
+ list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")
+ # [END howto_operator_gcp_datacatalog_list_tags_result]
+
+ # Update
+ # [START howto_operator_gcp_datacatalog_update_tag]
+ update_tag = CloudDataCatalogUpdateTagOperator(
+ task_id="update_tag",
+ tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
+ update_mask={"paths": ["fields"]},
+ location=LOCATION,
+ entry_group=ENTRY_GROUP_ID,
+ entry=ENTRY_ID,
+ tag_id=f"{create_tag.output['tag_id']}",
+ )
+ # [END howto_operator_gcp_datacatalog_update_tag]
+
+ # # Delete
+ # [START howto_operator_gcp_datacatalog_delete_entry]
+ delete_entry = CloudDataCatalogDeleteEntryOperator(
+ task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
+ )
+ # [END howto_operator_gcp_datacatalog_delete_entry]
+ delete_entry.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_operator_gcp_datacatalog_delete_entry_group]
+ delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
+ task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
+ )
+ # [END howto_operator_gcp_datacatalog_delete_entry_group]
+ delete_entry_group.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_operator_gcp_datacatalog_delete_tag]
+ delete_tag = CloudDataCatalogDeleteTagOperator(
+ task_id="delete_tag",
+ location=LOCATION,
+ entry_group=ENTRY_GROUP_ID,
+ entry=ENTRY_ID,
+ tag=create_tag.output["tag_id"],
+ )
+ # [END howto_operator_gcp_datacatalog_delete_tag]
+ delete_tag.trigger_rule = TriggerRule.ALL_DONE
+
+ # [START howto_operator_gcp_datacatalog_delete_tag_template]
+ delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
+ task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
+ )
+ # [END howto_operator_gcp_datacatalog_delete_tag_template]
+ delete_tag_template.trigger_rule = TriggerRule.ALL_DONE
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ # TEST BODY
+ >> create_entry_group
+ >> create_entry_group_result
+ >> create_entry_gcs
+ >> create_entry_gcs_result
+ >> create_tag_template
+ >> create_tag_template_result
+ >> create_tag
+ >> create_tag_result
+ >> list_tags
+ >> list_tags_result
+ >> update_tag
+ >> delete_tag
+ >> delete_tag_template
+ >> delete_entry
+ >> delete_entry_group
+ # TEST TEARDOWN
+ >> delete_bucket
+ )
+
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)