You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/11/07 23:37:43 UTC
[airflow] branch main updated: Add dataproc metastore operators
(#18945)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 26ad55b Add dataproc metastore operators (#18945)
26ad55b is described below
commit 26ad55beb00f5a0915ba4bec541e3d67044834e9
Author: Wojciech Januszek <wj...@sigma.ug.edu.pl>
AuthorDate: Mon Nov 8 00:37:21 2021 +0100
Add dataproc metastore operators (#18945)
---
.../example_dags/example_dataproc_metastore.py | 216 ++++
.../google/cloud/hooks/dataproc_metastore.py | 676 +++++++++++++
.../google/cloud/operators/dataproc_metastore.py | 1068 ++++++++++++++++++++
airflow/providers/google/provider.yaml | 11 +
.../operators/cloud/dataproc_metastore.rst | 196 ++++
setup.py | 1 +
.../google/cloud/hooks/test_dataproc_metastore.py | 489 +++++++++
.../cloud/operators/test_dataproc_metastore.py | 396 ++++++++
.../operators/test_dataproc_metastore_system.py | 40 +
9 files changed, 3093 insertions(+)
diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py b/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
new file mode 100644
index 0000000..563a044
--- /dev/null
+++ b/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
@@ -0,0 +1,216 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that show how to use various Dataproc Metastore
+operators to manage a service.
+"""
+
+import datetime
+import os
+
+from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.dataproc_metastore import (
+ DataprocMetastoreCreateBackupOperator,
+ DataprocMetastoreCreateMetadataImportOperator,
+ DataprocMetastoreCreateServiceOperator,
+ DataprocMetastoreDeleteBackupOperator,
+ DataprocMetastoreDeleteServiceOperator,
+ DataprocMetastoreExportMetadataOperator,
+ DataprocMetastoreGetServiceOperator,
+ DataprocMetastoreListBackupsOperator,
+ DataprocMetastoreRestoreServiceOperator,
+ DataprocMetastoreUpdateServiceOperator,
+)
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<PROJECT_ID>")
+SERVICE_ID = os.environ.get("GCP_DATAPROC_METASTORE_SERVICE_ID", "dataproc-metastore-system-tests-service-1")
+BACKUP_ID = os.environ.get("GCP_DATAPROC_METASTORE_BACKUP_ID", "dataproc-metastore-system-tests-backup-1")
+REGION = os.environ.get("GCP_REGION", "<REGION>")
+BUCKET = os.environ.get("GCP_DATAPROC_METASTORE_BUCKET", "INVALID BUCKET NAME")
+METADATA_IMPORT_FILE = os.environ.get("GCS_METADATA_IMPORT_FILE", None)
+GCS_URI = os.environ.get("GCS_URI", f"gs://{BUCKET}/data/hive.sql")
+METADATA_IMPORT_ID = "dataproc-metastore-system-tests-metadata-import-1"
+TIMEOUT = 1200
+DB_TYPE = "MYSQL"
+DESTINATION_GCS_FOLDER = f"gs://{BUCKET}/>"
+
+# Service definition
+# Docs: https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services#Service
+# [START how_to_cloud_dataproc_metastore_create_service]
+SERVICE = {
+ "name": "test-service",
+}
+# [END how_to_cloud_dataproc_metastore_create_service]
+
+# Update service
+# [START how_to_cloud_dataproc_metastore_update_service]
+SERVICE_TO_UPDATE = {
+ "labels": {
+ "mylocalmachine": "mylocalmachine",
+ "systemtest": "systemtest",
+ }
+}
+UPDATE_MASK = {"paths": ["labels"]}
+# [END how_to_cloud_dataproc_metastore_update_service]
+
+# Backup definition
+# [START how_to_cloud_dataproc_metastore_create_backup]
+BACKUP = {
+ "name": "test-backup",
+}
+# [END how_to_cloud_dataproc_metastore_create_backup]
+
+# Metadata import definition
+# [START how_to_cloud_dataproc_metastore_create_metadata_import]
+METADATA_IMPORT = {
+ "name": "test-metadata-import",
+ "database_dump": {
+ "gcs_uri": GCS_URI,
+ "database_type": DB_TYPE,
+ },
+}
+# [END how_to_cloud_dataproc_metastore_create_metadata_import]
+
+
+with models.DAG(
+ "example_gcp_dataproc_metastore", start_date=datetime.datetime(2021, 1, 1), schedule_interval="@once"
+) as dag:
+ # [START how_to_cloud_dataproc_metastore_create_service_operator]
+ create_service = DataprocMetastoreCreateServiceOperator(
+ task_id="create_service",
+ region=REGION,
+ project_id=PROJECT_ID,
+ service=SERVICE,
+ service_id=SERVICE_ID,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_create_service_operator]
+
+ # [START how_to_cloud_dataproc_metastore_get_service_operator]
+ get_service_details = DataprocMetastoreGetServiceOperator(
+ task_id="get_service",
+ region=REGION,
+ project_id=PROJECT_ID,
+ service_id=SERVICE_ID,
+ )
+ # [END how_to_cloud_dataproc_metastore_get_service_operator]
+
+ # [START how_to_cloud_dataproc_metastore_update_service_operator]
+ update_service = DataprocMetastoreUpdateServiceOperator(
+ task_id="update_service",
+ project_id=PROJECT_ID,
+ service_id=SERVICE_ID,
+ region=REGION,
+ service=SERVICE_TO_UPDATE,
+ update_mask=UPDATE_MASK,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_update_service_operator]
+
+ # [START how_to_cloud_dataproc_metastore_create_metadata_import_operator]
+ import_metadata = DataprocMetastoreCreateMetadataImportOperator(
+ task_id="create_metadata_import",
+ project_id=PROJECT_ID,
+ region=REGION,
+ service_id=SERVICE_ID,
+ metadata_import=METADATA_IMPORT,
+ metadata_import_id=METADATA_IMPORT_ID,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_create_metadata_import_operator]
+
+ # [START how_to_cloud_dataproc_metastore_export_metadata_operator]
+ export_metadata = DataprocMetastoreExportMetadataOperator(
+ task_id="export_metadata",
+ destination_gcs_folder=DESTINATION_GCS_FOLDER,
+ project_id=PROJECT_ID,
+ region=REGION,
+ service_id=SERVICE_ID,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_export_metadata_operator]
+
+ # [START how_to_cloud_dataproc_metastore_create_backup_operator]
+ backup_service = DataprocMetastoreCreateBackupOperator(
+ task_id="create_backup",
+ project_id=PROJECT_ID,
+ region=REGION,
+ service_id=SERVICE_ID,
+ backup=BACKUP,
+ backup_id=BACKUP_ID,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_create_backup_operator]
+
+ # [START how_to_cloud_dataproc_metastore_list_backups_operator]
+ list_backups = DataprocMetastoreListBackupsOperator(
+ task_id="list_backups",
+ project_id=PROJECT_ID,
+ region=REGION,
+ service_id=SERVICE_ID,
+ )
+ # [END how_to_cloud_dataproc_metastore_list_backups_operator]
+
+ # [START how_to_cloud_dataproc_metastore_delete_backup_operator]
+ delete_backup = DataprocMetastoreDeleteBackupOperator(
+ task_id="delete_backup",
+ project_id=PROJECT_ID,
+ region=REGION,
+ service_id=SERVICE_ID,
+ backup_id=BACKUP_ID,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_delete_backup_operator]
+
+ # [START how_to_cloud_dataproc_metastore_restore_service_operator]
+ restore_service = DataprocMetastoreRestoreServiceOperator(
+ task_id="restore_metastore",
+ region=REGION,
+ project_id=PROJECT_ID,
+ service_id=SERVICE_ID,
+ backup_id=BACKUP_ID,
+ backup_region=REGION,
+ backup_project_id=PROJECT_ID,
+ backup_service_id=SERVICE_ID,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_restore_service_operator]
+
+ # [START how_to_cloud_dataproc_metastore_delete_service_operator]
+ delete_service = DataprocMetastoreDeleteServiceOperator(
+ task_id="delete_service",
+ region=REGION,
+ project_id=PROJECT_ID,
+ service_id=SERVICE_ID,
+ timeout=TIMEOUT,
+ )
+ # [END how_to_cloud_dataproc_metastore_delete_service_operator]
+
+ chain(
+ create_service,
+ update_service,
+ get_service_details,
+ backup_service,
+ list_backups,
+ restore_service,
+ delete_backup,
+ export_metadata,
+ import_metadata,
+ delete_service,
+ )
diff --git a/airflow/providers/google/cloud/hooks/dataproc_metastore.py b/airflow/providers/google/cloud/hooks/dataproc_metastore.py
new file mode 100644
index 0000000..7a645ff
--- /dev/null
+++ b/airflow/providers/google/cloud/hooks/dataproc_metastore.py
@@ -0,0 +1,676 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Dataproc Metastore hook."""
+
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+from google.api_core.operation import Operation
+from google.api_core.retry import Retry
+from google.cloud.metastore_v1 import DataprocMetastoreClient
+from google.cloud.metastore_v1.types import Backup, MetadataImport, Service
+from google.cloud.metastore_v1.types.metastore import DatabaseDumpSpec, Restore
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+
+class DataprocMetastoreHook(GoogleBaseHook):
+ """Hook for Google Cloud Dataproc Metastore APIs."""
+
+ def get_dataproc_metastore_client(self) -> DataprocMetastoreClient:
+ """Returns DataprocMetastoreClient."""
+ client_options = {'api_endpoint': 'metastore.googleapis.com:443'}
+
+ return DataprocMetastoreClient(
+ credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options
+ )
+
+ def wait_for_operation(self, timeout: float, operation: Operation):
+ """Waits for long-lasting operation to complete."""
+ try:
+ return operation.result(timeout=timeout)
+ except Exception:
+ error = operation.exception(timeout=timeout)
+ raise AirflowException(error)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_backup(
+ self,
+ project_id: str,
+ region: str,
+ service_id: str,
+ backup: Backup,
+ backup_id: str,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Creates a new backup in a given project and location.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param backup: Required. The backup to create. The ``name`` field is ignored. The ID of the created
+ backup must be provided in the request's ``backup_id`` field.
+
+ This corresponds to the ``backup`` field on the ``request`` instance; if ``request`` is provided,
+ this should not be set.
+ :type backup: google.cloud.metastore_v1.types.Backup
+ :param backup_id: Required. The ID of the backup, which is used as the final component of the
+ backup's name. This value must be between 1 and 64 characters long, begin with a letter, end with
+ a letter or number, and consist of alphanumeric ASCII characters or hyphens.
+
+ This corresponds to the ``backup_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type backup_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ parent = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+ client = self.get_dataproc_metastore_client()
+ result = client.create_backup(
+ request={
+ 'parent': parent,
+ 'backup': backup,
+ 'backup_id': backup_id,
+ 'request_id': request_id,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_metadata_import(
+ self,
+ project_id: str,
+ region: str,
+ service_id: str,
+ metadata_import: MetadataImport,
+ metadata_import_id: str,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Creates a new MetadataImport in a given project and location.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param metadata_import: Required. The metadata import to create. The ``name`` field is ignored. The
+ ID of the created metadata import must be provided in the request's ``metadata_import_id`` field.
+
+ This corresponds to the ``metadata_import`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type metadata_import: google.cloud.metastore_v1.types.MetadataImport
+ :param metadata_import_id: Required. The ID of the metadata import, which is used as the final
+ component of the metadata import's name. This value must be between 1 and 64 characters long,
+ begin with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``metadata_import_id`` field on the ``request`` instance; if ``request``
+ is provided, this should not be set.
+ :type metadata_import_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ parent = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+ client = self.get_dataproc_metastore_client()
+ result = client.create_metadata_import(
+ request={
+ 'parent': parent,
+ 'metadata_import': metadata_import,
+ 'metadata_import_id': metadata_import_id,
+ 'request_id': request_id,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_service(
+ self,
+ region: str,
+ project_id: str,
+ service: Union[Dict, Service],
+ service_id: str,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ ):
+ """
+ Creates a metastore service in a project and location.
+
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param service: Required. The Metastore service to create. The ``name`` field is ignored. The ID of
+ the created metastore service must be provided in the request's ``service_id`` field.
+
+ This corresponds to the ``service`` field on the ``request`` instance; if ``request`` is provided,
+ this should not be set.
+ :type service: google.cloud.metastore_v1.types.Service
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ parent = f'projects/{project_id}/locations/{region}'
+
+ client = self.get_dataproc_metastore_client()
+ result = client.create_service(
+ request={
+ 'parent': parent,
+ 'service_id': service_id,
+ 'service': service if service else {},
+ 'request_id': request_id,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_backup(
+ self,
+ project_id: str,
+ region: str,
+ service_id: str,
+ backup_id: str,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Deletes a single backup.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param backup_id: Required. The ID of the backup, which is used as the final component of the
+ backup's name. This value must be between 1 and 64 characters long, begin with a letter, end with
+ a letter or number, and consist of alphanumeric ASCII characters or hyphens.
+
+ This corresponds to the ``backup_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type backup_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ name = f'projects/{project_id}/locations/{region}/services/{service_id}/backups/{backup_id}'
+
+ client = self.get_dataproc_metastore_client()
+ result = client.delete_backup(
+ request={
+ 'name': name,
+ 'request_id': request_id,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_service(
+ self,
+ project_id: str,
+ region: str,
+ service_id: str,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Deletes a single service.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ name = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+ client = self.get_dataproc_metastore_client()
+ result = client.delete_service(
+ request={
+ 'name': name,
+ 'request_id': request_id,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def export_metadata(
+ self,
+ destination_gcs_folder: str,
+ project_id: str,
+ region: str,
+ service_id: str,
+ request_id: Optional[str] = None,
+ database_dump_type: Optional[DatabaseDumpSpec] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Exports metadata from a service.
+
+ :param destination_gcs_folder: A Cloud Storage URI of a folder, in the format
+ ``gs://<bucket_name>/<path_inside_bucket>``. A sub-folder
+ ``<export_folder>`` containing exported files will be
+ created below it.
+ :type destination_gcs_folder: str
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param database_dump_type: Optional. The type of the database dump. If unspecified,
+ defaults to ``MYSQL``.
+ :type database_dump_type: google.cloud.metastore_v1.types.DatabaseDumpSpec.Type
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ service = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+ client = self.get_dataproc_metastore_client()
+ result = client.export_metadata(
+ request={
+ 'destination_gcs_folder': destination_gcs_folder,
+ 'service': service,
+ 'request_id': request_id,
+ 'database_dump_type': database_dump_type,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_service(
+ self,
+ project_id: str,
+ region: str,
+ service_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Gets the details of a single service.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ name = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+ client = self.get_dataproc_metastore_client()
+ result = client.get_service(
+ request={
+ 'name': name,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def list_backups(
+ self,
+ project_id: str,
+ region: str,
+ service_id: str,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ filter: Optional[str] = None,
+ order_by: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Lists backups in a service.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param page_size: Optional. The maximum number of backups to
+ return. The response may contain less than the
+ maximum number. If unspecified, no more than 500
+ backups are returned. The maximum value is 1000;
+ values above 1000 are changed to 1000.
+ :type page_size: int
+ :param page_token: Optional. A page token, received from a previous
+ [DataprocMetastore.ListBackups][google.cloud.metastore.v1.DataprocMetastore.ListBackups]
+ call. Provide this token to retrieve the subsequent page.
+ To retrieve the first page, supply an empty page token.
+ When paginating, other parameters provided to
+ [DataprocMetastore.ListBackups][google.cloud.metastore.v1.DataprocMetastore.ListBackups]
+ must match the call that provided the page token.
+ :type page_token: str
+ :param filter: Optional. The filter to apply to list
+ results.
+ :type filter: str
+ :param order_by: Optional. Specify the ordering of results as described in
+ `Sorting
+ Order <https://cloud.google.com/apis/design/design_patterns#sorting_order>`__.
+ If not specified, the results will be sorted in the default
+ order.
+ :type order_by: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ parent = f'projects/{project_id}/locations/{region}/services/{service_id}/backups'
+
+ client = self.get_dataproc_metastore_client()
+ result = client.list_backups(
+ request={
+ 'parent': parent,
+ 'page_size': page_size,
+ 'page_token': page_token,
+ 'filter': filter,
+ 'order_by': order_by,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def restore_service(
+ self,
+ project_id: str,
+ region: str,
+ service_id: str,
+ backup_project_id: str,
+ backup_region: str,
+ backup_service_id: str,
+ backup_id: str,
+ restore_type: Optional[Restore] = None,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Restores a service from a backup.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param backup_project_id: Required. The ID of the Google Cloud project that the metastore service
+ backup to restore from.
+ :type backup_project_id: str
+ :param backup_region: Required. The ID of the Google Cloud region that the metastore
+ service backup to restore from.
+ :type backup_region: str
+ :param backup_service_id: Required. The ID of the metastore service backup to restore from,
+ which is used as the final component of the metastore service's name. This value must be
+ between 2 and 63 characters long inclusive, begin with a letter, end with a letter or number,
+ and consist of alphanumeric ASCII characters or hyphens.
+ :type backup_service_id: str
+ :param backup_id: Required. The ID of the metastore service backup to restore from
+ :type backup_id: str
+ :param restore_type: Optional. The type of restore. If unspecified, defaults to
+ ``METADATA_ONLY``
+ :type restore_type: google.cloud.metastore_v1.types.Restore.RestoreType
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ service = f'projects/{project_id}/locations/{region}/services/{service_id}'
+ backup = (
+ f'projects/{backup_project_id}/locations/{backup_region}/services/'
+ f'{backup_service_id}/backups/{backup_id}'
+ )
+
+ client = self.get_dataproc_metastore_client()
+ result = client.restore_service(
+ request={
+ 'service': service,
+ 'backup': backup,
+ 'restore_type': restore_type,
+ 'request_id': request_id,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def update_service(
+ self,
+ project_id: str,
+ region: str,
+ service_id: str,
+ service: Union[Dict, Service],
+ update_mask: FieldMask,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = None,
+ ):
+ """
+ Updates the parameters of a single service.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param service: Required. The metastore service to update. The server only merges fields in the
+ service if they are specified in ``update_mask``.
+
+ The metastore service's ``name`` field is used to identify the metastore service to be updated.
+
+ This corresponds to the ``service`` field on the ``request`` instance; if ``request`` is provided,
+ this should not be set.
+ :type service: Union[Dict, google.cloud.metastore_v1.types.Service]
+ :param update_mask: Required. A field mask used to specify the fields to be overwritten in the
+ metastore service resource by the update. Fields specified in the ``update_mask`` are relative to
+ the resource (not to the full request). A field is overwritten if it is in the mask.
+
+ This corresponds to the ``update_mask`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type update_mask: google.protobuf.field_mask_pb2.FieldMask
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ """
+ client = self.get_dataproc_metastore_client()
+
+ service_name = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+ service["name"] = service_name
+
+ result = client.update_service(
+ request={
+ 'service': service,
+ 'update_mask': update_mask,
+ 'request_id': request_id,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py b/airflow/providers/google/cloud/operators/dataproc_metastore.py
new file mode 100644
index 0000000..2823b72
--- /dev/null
+++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py
@@ -0,0 +1,1068 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Dataproc Metastore operators."""
+
+from time import sleep
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+from google.api_core.retry import Retry, exponential_sleep_generator
+from google.cloud.metastore_v1 import MetadataExport, MetadataManagementActivity
+from google.cloud.metastore_v1.types import Backup, MetadataImport, Service
+from google.cloud.metastore_v1.types.metastore import DatabaseDumpSpec, Restore
+from google.protobuf.field_mask_pb2 import FieldMask
+from googleapiclient.errors import HttpError
+
+from airflow import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook
+
+
+class DataprocMetastoreCreateBackupOperator(BaseOperator):
+ """
+ Creates a new backup in a given project and location.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param backup: Required. The backup to create. The ``name`` field is ignored. The ID of the created
+ backup must be provided in the request's ``backup_id`` field.
+
+ This corresponds to the ``backup`` field on the ``request`` instance; if ``request`` is provided, this
+ should not be set.
+ :type backup: google.cloud.metastore_v1.types.Backup
+ :param backup_id: Required. The ID of the backup, which is used as the final component of the backup's
+ name. This value must be between 1 and 64 characters long, begin with a letter, end with a letter or
+ number, and consist of alphanumeric ASCII characters or hyphens.
+
+ This corresponds to the ``backup_id`` field on the ``request`` instance; if ``request`` is provided,
+ this should not be set.
+ :type backup_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Optional. Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: Optional. The timeout for this request.
+ :type timeout: float
+ :param metadata: Optional. Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ 'project_id',
+ 'backup',
+ 'impersonation_chain',
+ )
+ template_fields_renderers = {'backup': 'json'}
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ region: str,
+ service_id: str,
+ backup: Union[Dict, Backup],
+ backup_id: str,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.service_id = service_id
+ self.backup = backup
+ self.backup_id = backup_id
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: dict) -> dict:
+ hook = DataprocMetastoreHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ self.log.info("Creating Dataproc Metastore backup: %s", self.backup_id)
+
+ try:
+ operation = hook.create_backup(
+ project_id=self.project_id,
+ region=self.region,
+ service_id=self.service_id,
+ backup=self.backup,
+ backup_id=self.backup_id,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ backup = hook.wait_for_operation(self.timeout, operation)
+ self.log.info("Backup %s created successfully", self.backup_id)
+ except HttpError as err:
+ if err.resp.status not in (409, '409'):
+ raise
+ self.log.info("Backup %s already exists", self.backup_id)
+ backup = hook.get_backup(
+ project_id=self.project_id,
+ region=self.region,
+ service_id=self.service_id,
+ backup_id=self.backup_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return Backup.to_dict(backup)
+
+
+class DataprocMetastoreCreateMetadataImportOperator(BaseOperator):
+ """
+ Creates a new MetadataImport in a given project and location.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param metadata_import: Required. The metadata import to create. The ``name`` field is ignored. The ID of
+ the created metadata import must be provided in the request's ``metadata_import_id`` field.
+
+ This corresponds to the ``metadata_import`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type metadata_import: google.cloud.metastore_v1.types.MetadataImport
+ :param metadata_import_id: Required. The ID of the metadata import, which is used as the final component
+ of the metadata import's name. This value must be between 1 and 64 characters long, begin with a
+ letter, end with a letter or number, and consist of alphanumeric ASCII characters or hyphens.
+
+ This corresponds to the ``metadata_import_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type metadata_import_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Optional. Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: Optional. The timeout for this request.
+ :type timeout: float
+ :param metadata: Optional. Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ 'project_id',
+ 'metadata_import',
+ 'impersonation_chain',
+ )
+ template_fields_renderers = {'metadata_import': 'json'}
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ region: str,
+ service_id: str,
+ metadata_import: MetadataImport,
+ metadata_import_id: str,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.service_id = service_id
+ self.metadata_import = metadata_import
+ self.metadata_import_id = metadata_import_id
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: dict):
+ hook = DataprocMetastoreHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ self.log.info("Creating Dataproc Metastore metadata import: %s", self.metadata_import_id)
+ operation = hook.create_metadata_import(
+ project_id=self.project_id,
+ region=self.region,
+ service_id=self.service_id,
+ metadata_import=self.metadata_import,
+ metadata_import_id=self.metadata_import_id,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ metadata_import = hook.wait_for_operation(self.timeout, operation)
+ self.log.info("Metadata import %s created successfully", self.metadata_import_id)
+ return MetadataImport.to_dict(metadata_import)
+
+
+class DataprocMetastoreCreateServiceOperator(BaseOperator):
+ """
+ Creates a metastore service in a project and location.
+
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param service: Required. The Metastore service to create. The ``name`` field is ignored. The ID of
+ the created metastore service must be provided in the request's ``service_id`` field.
+
+ This corresponds to the ``service`` field on the ``request`` instance; if ``request`` is provided,
+ this should not be set.
+ :type service: google.cloud.metastore_v1.types.Service
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ 'project_id',
+ 'service',
+ 'impersonation_chain',
+ )
+ template_fields_renderers = {'service': 'json'}
+
+ def __init__(
+ self,
+ *,
+ region: str,
+ project_id: str,
+ service: Optional[Union[Dict, Service]] = None,
+ service_id: str,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.region = region
+ self.project_id = project_id
+ self.service = service
+ self.service_id = service_id
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context) -> dict:
+ hook = DataprocMetastoreHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ self.log.info("Creating Dataproc Metastore service: %s", self.project_id)
+ try:
+ operation = hook.create_service(
+ region=self.region,
+ project_id=self.project_id,
+ service=self.service,
+ service_id=self.service_id,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ service = hook.wait_for_operation(self.timeout, operation)
+ self.log.info("Service %s created successfully", self.service_id)
+ except HttpError as err:
+ if err.resp.status not in (409, '409'):
+ raise
+ self.log.info("Instance %s already exists", self.service_id)
+ service = hook.get_service(
+ region=self.region,
+ project_id=self.project_id,
+ service_id=self.service_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return Service.to_dict(service)
+
+
+class DataprocMetastoreDeleteBackupOperator(BaseOperator):
+ """
+ Deletes a single backup.
+
+ :param project_id: Required. The ID of the Google Cloud project that the backup belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the backup belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param backup_id: Required. The ID of the backup, which is used as the final component of the backup's
+ name. This value must be between 1 and 64 characters long, begin with a letter, end with a letter or
+ number, and consist of alphanumeric ASCII characters or hyphens.
+
+ This corresponds to the ``backup_id`` field on the ``request`` instance; if ``request`` is provided,
+ this should not be set.
+ :type backup_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Optional. Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: Optional. The timeout for this request.
+ :type timeout: float
+ :param metadata: Optional. Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ 'project_id',
+ 'impersonation_chain',
+ )
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ region: str,
+ service_id: str,
+ backup_id: str,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.service_id = service_id
+ self.backup_id = backup_id
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: dict) -> None:
+ hook = DataprocMetastoreHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ self.log.info("Deleting Dataproc Metastore backup: %s", self.backup_id)
+ operation = hook.delete_backup(
+ project_id=self.project_id,
+ region=self.region,
+ service_id=self.service_id,
+ backup_id=self.backup_id,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(self.timeout, operation)
+ self.log.info("Backup %s deleted successfully", self.project_id)
+
+
+class DataprocMetastoreDeleteServiceOperator(BaseOperator):
+ """
+ Deletes a single service.
+
+ :param request: The request object. Request message for
+ [DataprocMetastore.DeleteService][google.cloud.metastore.v1.DataprocMetastore.DeleteService].
+ :type request: google.cloud.metastore_v1.types.DeleteServiceRequest
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id:
+ :type gcp_conn_id: str
+ """
+
+ template_fields = (
+ 'project_id',
+ 'impersonation_chain',
+ )
+
+ def __init__(
+ self,
+ *,
+ region: str,
+ project_id: str,
+ service_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.region = region
+ self.project_id = project_id
+ self.service_id = service_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context) -> dict:
+ hook = DataprocMetastoreHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ self.log.info("Deleting Dataproc Metastore service: %s", self.project_id)
+ operation = hook.delete_service(
+ region=self.region,
+ project_id=self.project_id,
+ service_id=self.service_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(self.timeout, operation)
+ self.log.info("Service %s deleted successfully", self.project_id)
+
+
+class DataprocMetastoreExportMetadataOperator(BaseOperator):
+ """
+ Exports metadata from a service.
+
+ :param destination_gcs_folder: A Cloud Storage URI of a folder, in the format
+ ``gs://<bucket_name>/<path_inside_bucket>``. A sub-folder
+ ``<export_folder>`` containing exported files will be
+ created below it.
+ :type destination_gcs_folder: str
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Optional. Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: Optional. The timeout for this request.
+ :type timeout: float
+ :param metadata: Optional. Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ 'project_id',
+ 'impersonation_chain',
+ )
+
+ def __init__(
+ self,
+ *,
+ destination_gcs_folder: str,
+ project_id: str,
+ region: str,
+ service_id: str,
+ request_id: Optional[str] = None,
+ database_dump_type: Optional[DatabaseDumpSpec] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.destination_gcs_folder = destination_gcs_folder
+ self.project_id = project_id
+ self.region = region
+ self.service_id = service_id
+ self.request_id = request_id
+ self.database_dump_type = database_dump_type
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Dict):
+ hook = DataprocMetastoreHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ self.log.info("Exporting metadata from Dataproc Metastore service: %s", self.service_id)
+ hook.export_metadata(
+ destination_gcs_folder=self.destination_gcs_folder,
+ project_id=self.project_id,
+ region=self.region,
+ service_id=self.service_id,
+ request_id=self.request_id,
+ database_dump_type=self.database_dump_type,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ metadata_export = self._wait_for_export_metadata(hook)
+ self.log.info("Metadata from service %s exported successfully", self.service_id)
+ return MetadataExport.to_dict(metadata_export)
+
+ def _wait_for_export_metadata(self, hook: DataprocMetastoreHook):
+ """
+ Workaround to check that export was created successfully.
+ We discovered a issue to parse result to MetadataExport inside the SDK
+ """
+ for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
+ sleep(time_to_wait)
+ service = hook.get_service(
+ region=self.region,
+ project_id=self.project_id,
+ service_id=self.service_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ activities: MetadataManagementActivity = service.metadata_management_activity
+ metadata_export: MetadataExport = activities.metadata_exports[0]
+ if metadata_export.state == MetadataExport.State.SUCCEEDED:
+ return metadata_export
+ if metadata_export.state == MetadataExport.State.FAILED:
+ raise AirflowException(
+ f"Exporting metadata from Dataproc Metastore {metadata_export.name} FAILED"
+ )
+
+
+class DataprocMetastoreGetServiceOperator(BaseOperator):
+ """
+ Gets the details of a single service.
+
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param retry: Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: The timeout for this request.
+ :type timeout: float
+ :param metadata: Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ 'project_id',
+ 'impersonation_chain',
+ )
+
+ def __init__(
+ self,
+ *,
+ region: str,
+ project_id: str,
+ service_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.region = region
+ self.project_id = project_id
+ self.service_id = service_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context) -> dict:
+ hook = DataprocMetastoreHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ self.log.info("Gets the details of a single Dataproc Metastore service: %s", self.project_id)
+ result = hook.get_service(
+ region=self.region,
+ project_id=self.project_id,
+ service_id=self.service_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return Service.to_dict(result)
+
+
+class DataprocMetastoreListBackupsOperator(BaseOperator):
+ """
+ Lists backups in a service.
+
+ :param project_id: Required. The ID of the Google Cloud project that the backup belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the backup belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param retry: Optional. Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: Optional. The timeout for this request.
+ :type timeout: float
+ :param metadata: Optional. Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ 'project_id',
+ 'impersonation_chain',
+ )
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ region: str,
+ service_id: str,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ filter: Optional[str] = None,
+ order_by: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.service_id = service_id
+ self.page_size = page_size
+ self.page_token = page_token
+ self.filter = filter
+ self.order_by = order_by
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: dict) -> dict:
+ hook = DataprocMetastoreHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ self.log.info("Listing Dataproc Metastore backups: %s", self.service_id)
+ backups = hook.list_backups(
+ project_id=self.project_id,
+ region=self.region,
+ service_id=self.service_id,
+ page_size=self.page_size,
+ page_token=self.page_token,
+ filter=self.filter,
+ order_by=self.order_by,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return [Backup.to_dict(backup) for backup in backups]
+
+
+class DataprocMetastoreRestoreServiceOperator(BaseOperator):
+ """
+ Restores a service from a backup.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param backup_project_id: Required. The ID of the Google Cloud project that the metastore
+ service backup to restore from.
+ :type backup_project_id: str
+ :param backup_region: Required. The ID of the Google Cloud region that the metastore
+ service backup to restore from.
+ :type backup_region: str
+ :param backup_service_id: Required. The ID of the metastore service backup to restore from, which is
+ used as the final component of the metastore service's name. This value must be between 2 and 63
+ characters long inclusive, begin with a letter, end with a letter or number, and consist
+ of alphanumeric ASCII characters or hyphens.
+ :type backup_service_id: str
+ :param backup_id: Required. The ID of the metastore service backup to restore from
+ :type backup_id: str
+ :param restore_type: Optional. The type of restore. If unspecified, defaults to
+ ``METADATA_ONLY``
+ :type restore_type: google.cloud.metastore_v1.types.Restore.RestoreType
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Optional. Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: Optional. The timeout for this request.
+ :type timeout: float
+ :param metadata: Optional. Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ 'project_id',
+ 'impersonation_chain',
+ )
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ region: str,
+ service_id: str,
+ backup_project_id: str,
+ backup_region: str,
+ backup_service_id: str,
+ backup_id: str,
+ restore_type: Optional[Restore] = None,
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.service_id = service_id
+ self.backup_project_id = backup_project_id
+ self.backup_region = backup_region
+ self.backup_service_id = backup_service_id
+ self.backup_id = backup_id
+ self.restore_type = restore_type
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context) -> dict:
+ hook = DataprocMetastoreHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ self.log.info(
+ "Restoring Dataproc Metastore service: %s from backup: %s", self.service_id, self.backup_id
+ )
+ hook.restore_service(
+ project_id=self.project_id,
+ region=self.region,
+ service_id=self.service_id,
+ backup_project_id=self.backup_project_id,
+ backup_region=self.backup_region,
+ backup_service_id=self.backup_service_id,
+ backup_id=self.backup_id,
+ restore_type=self.restore_type,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ self._wait_for_restore_service(hook)
+ self.log.info("Service %s restored from backup %s", self.service_id, self.backup_id)
+
+ def _wait_for_restore_service(self, hook: DataprocMetastoreHook):
+ """
+ Workaround to check that restore service was finished successfully.
+ We discovered an issue to parse result to Restore inside the SDK
+ """
+ for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
+ sleep(time_to_wait)
+ service = hook.get_service(
+ region=self.region,
+ project_id=self.project_id,
+ service_id=self.service_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ activities: MetadataManagementActivity = service.metadata_management_activity
+ restore_service: Restore = activities.restores[0]
+ if restore_service.state == Restore.State.SUCCEEDED:
+ return restore_service
+ if restore_service.state == Restore.State.FAILED:
+ raise AirflowException("Restoring service FAILED")
+
+
+class DataprocMetastoreUpdateServiceOperator(BaseOperator):
+ """
+ Updates the parameters of a single service.
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :type project_id: str
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :type region: str
+ :param service_id: Required. The ID of the metastore service, which is used as the final component of
+ the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+ with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+ hyphens.
+
+ This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+ provided, this should not be set.
+ :type service_id: str
+ :param service: Required. The metastore service to update. The server only merges fields in the service
+ if they are specified in ``update_mask``.
+
+ The metastore service's ``name`` field is used to identify the metastore service to be updated.
+
+ This corresponds to the ``service`` field on the ``request`` instance; if ``request`` is provided,
+ this should not be set.
+ :type service: Union[Dict, google.cloud.metastore_v1.types.Service]
+ :param update_mask: Required. A field mask used to specify the fields to be overwritten in the metastore
+ service resource by the update. Fields specified in the ``update_mask`` are relative to the resource
+ (not to the full request). A field is overwritten if it is in the mask.
+
+ This corresponds to the ``update_mask`` field on the ``request`` instance; if ``request`` is provided,
+ this should not be set.
+ :type update_mask: google.protobuf.field_mask_pb2.FieldMask
+ :param request_id: Optional. A unique id used to identify the request.
+ :type request_id: str
+ :param retry: Optional. Designation of what errors, if any, should be retried.
+ :type retry: google.api_core.retry.Retry
+ :param timeout: Optional. The timeout for this request.
+ :type timeout: float
+ :param metadata: Optional. Strings which should be sent along with the request as metadata.
+ :type metadata: Sequence[Tuple[str, str]]
+ :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+ :type gcp_conn_id: str
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :type impersonation_chain: Union[str, Sequence[str]]
+ """
+
+ template_fields = (
+ 'project_id',
+ 'impersonation_chain',
+ )
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ region: str,
+ service_id: str,
+ service: Union[Dict, Service],
+ update_mask: Union[Dict, FieldMask],
+ request_id: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Optional[Sequence[Tuple[str, str]]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.service_id = service_id
+ self.service = service
+ self.update_mask = update_mask
+ self.request_id = request_id
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Dict):
+ hook = DataprocMetastoreHook(
+ gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+ )
+ self.log.info("Updating Dataproc Metastore service: %s", self.service.get("name"))
+
+ operation = hook.update_service(
+ project_id=self.project_id,
+ region=self.region,
+ service_id=self.service_id,
+ service=self.service,
+ update_mask=self.update_mask,
+ request_id=self.request_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(self.timeout, operation)
+ self.log.info("Service %s updated successfully", self.service.get("name"))
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index ef842f0..ff1c896 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -230,6 +230,11 @@ integrations:
- /docs/apache-airflow-providers-google/operators/cloud/dataprep.rst
logo: /integration-logos/gcp/Google-Dataprep.png
tags: [gcp]
+ - integration-name: Google Dataproc Metastore
+ external-doc-url: https://cloud.google.com/dataproc-metastore/
+ how-to-guide:
+ - /docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
+ tags: [gcp]
- integration-name: Google Dataproc
external-doc-url: https://cloud.google.com/dataproc/
how-to-guide:
@@ -368,6 +373,9 @@ operators:
- integration-name: Google Dataprep
python-modules:
- airflow.providers.google.cloud.operators.dataprep
+ - integration-name: Google Dataproc Metastore
+ python-modules:
+ - airflow.providers.google.cloud.operators.dataproc_metastore
- integration-name: Google Dataproc
python-modules:
- airflow.providers.google.cloud.operators.dataproc
@@ -537,6 +545,9 @@ hooks:
- integration-name: Google Dataprep
python-modules:
- airflow.providers.google.cloud.hooks.dataprep
+ - integration-name: Google Dataproc Metastore
+ python-modules:
+ - airflow.providers.google.cloud.hooks.dataproc_metastore
- integration-name: Google Dataproc
python-modules:
- airflow.providers.google.cloud.hooks.dataproc
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst b/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
new file mode 100644
index 0000000..c7ff530
--- /dev/null
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
@@ -0,0 +1,196 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Google Cloud Dataproc Metastore Operators
+=========================================
+
+Dataproc Metastore is a fully managed, highly available, auto-healing serverless
+Apache Hive metastore (HMS) that runs on Google Cloud. It supports HMS, serves as
+a critical component for managing the metadata of relational entities,
+and provides interoperability between data processing applications in the open source data ecosystem.
+
+For more information about the service visit `Dataproc Metastore production documentation <Product documentation <https://cloud.google.com/dataproc-metastore/docs/reference>`__
+
+Create a Service
+----------------
+
+Before you create a dataproc metastore service you need to define the service.
+For more information about the available fields to pass when creating a service, visit `Dataproc Metastore create service API. <https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services#Service>`__
+
+A simple service configuration can look as followed:
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_metastore_create_service]
+ :end-before: [END how_to_cloud_dataproc_metastore_create_service]
+
+With this configuration we can create the service:
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateServiceOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_metastore_create_service_operator]
+ :end-before: [END how_to_cloud_dataproc_metastore_create_service_operator]
+
+Get a service
+-------------
+
+To get a service you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreGetServiceOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_metastore_get_service_operator]
+ :end-before: [END how_to_cloud_dataproc_metastore_get_service_operator]
+
+Update a service
+----------------
+You can update the service by providing a service config and an updateMask.
+In the updateMask argument you specifies the path, relative to Service, of the field to update.
+For more information on updateMask and other parameters take a look at `Dataproc Metastore update service API. <https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services/patch>`__
+
+An example of a new service config and the updateMask:
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_metastore_update_service]
+ :end-before: [END how_to_cloud_dataproc_metastore_update_service]
+
+To update a service you can use:
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreUpdateServiceOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_metastore_update_service_operator]
+ :end-before: [END how_to_cloud_dataproc_metastore_update_service_operator]
+
+Delete a service
+----------------
+
+To delete a service you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteServiceOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_metastore_delete_service_operator]
+ :end-before: [END how_to_cloud_dataproc_metastore_delete_service_operator]
+
+Export a service metadata
+-------------------------
+
+To export metadata you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreExportMetadataOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_metastore_export_metadata_operator]
+ :end-before: [END how_to_cloud_dataproc_metastore_export_metadata_operator]
+
+Restore a service
+-----------------
+
+To restore a service you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreRestoreServiceOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_metastore_restore_service_operator]
+ :end-before: [END how_to_cloud_dataproc_metastore_restore_service_operator]
+
+Create a metadata import
+------------------------
+
+Before you create a dataproc metastore metadata import you need to define the metadata import.
+For more information about the available fields to pass when creating a metadata import, visit `Dataproc Metastore create metadata import API. <https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services.metadataImports#MetadataImport>`__
+
+A simple metadata import configuration can look as followed:
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_metastore_create_metadata_import]
+ :end-before: [END how_to_cloud_dataproc_metastore_create_metadata_import]
+
+To create a metadata import you can use:
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateMetadataImportOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_metastore_create_metadata_import_operator]
+ :end-before: [END how_to_cloud_dataproc_metastore_create_metadata_import_operator]
+
+Create a Backup
+---------------
+
+Before you create a dataproc metastore backup of the service you need to define the backup.
+For more information about the available fields to pass when creating a backup, visit `Dataproc Metastore create backup API. <https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services.backups#Backup>`__
+
+A simple backup configuration can look as followed:
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 0
+ :start-after: [START how_to_cloud_dataproc_metastore_create_backup]
+ :end-before: [END how_to_cloud_dataproc_metastore_create_backup]
+
+With this configuration we can create the backup:
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateBackupOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_metastore_create_backup_operator]
+ :end-before: [END how_to_cloud_dataproc_metastore_create_backup_operator]
+
+Delete a backup
+---------------
+
+To delete a backup you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteBackupOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_metastore_delete_backup_operator]
+ :end-before: [END how_to_cloud_dataproc_metastore_delete_backup_operator]
+
+List backups
+------------
+
+To list backups you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreListBackupsOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+ :language: python
+ :dedent: 4
+ :start-after: [START how_to_cloud_dataproc_metastore_list_backups_operator]
+ :end-before: [END how_to_cloud_dataproc_metastore_list_backups_operator]
diff --git a/setup.py b/setup.py
index 70b715b..7542a7d 100644
--- a/setup.py
+++ b/setup.py
@@ -307,6 +307,7 @@ google = [
'google-cloud-container>=0.1.1,<2.0.0',
'google-cloud-datacatalog>=3.0.0,<4.0.0',
'google-cloud-dataproc>=2.2.0,<4.0.0',
+ 'google-cloud-dataproc-metastore>=1.2.0,<2.0.0',
'google-cloud-dlp>=0.11.0,<2.0.0',
'google-cloud-kms>=2.0.0,<3.0.0',
'google-cloud-language>=1.1.1,<2.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_dataproc_metastore.py b/tests/providers/google/cloud/hooks/test_dataproc_metastore.py
new file mode 100644
index 0000000..cd8602c
--- /dev/null
+++ b/tests/providers/google/cloud/hooks/test_dataproc_metastore.py
@@ -0,0 +1,489 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase, mock
+
+from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook
+from tests.providers.google.cloud.utils.base_gcp_mock import (
+ mock_base_gcp_hook_default_project_id,
+ mock_base_gcp_hook_no_default_project_id,
+)
+
+TEST_GCP_CONN_ID: str = "test-gcp-conn-id"
+TEST_REGION: str = "test-region"
+TEST_PROJECT_ID: str = "test-project-id"
+TEST_BACKUP: str = "test-backup"
+TEST_BACKUP_ID: str = "test-backup-id"
+TEST_METADATA_IMPORT: dict = {
+ "name": "test-metadata-import",
+ "database_dump": {
+ "gcs_uri": "gs://bucket_name/path_inside_bucket",
+ "database_type": "MYSQL",
+ },
+}
+TEST_METADATA_IMPORT_ID: str = "test-metadata-import-id"
+TEST_SERVICE: dict = {"name": "test-service"}
+TEST_SERVICE_ID: str = "test-service-id"
+TEST_SERVICE_TO_UPDATE = {
+ "labels": {
+ "first_key": "first_value",
+ "second_key": "second_value",
+ }
+}
+TEST_UPDATE_MASK: dict = {"paths": ["labels"]}
+TEST_PARENT: str = "projects/{}/locations/{}"
+TEST_PARENT_SERVICES: str = "projects/{}/locations/{}/services/{}"
+TEST_PARENT_BACKUPS: str = "projects/{}/locations/{}/services/{}/backups"
+TEST_NAME_BACKUPS: str = "projects/{}/locations/{}/services/{}/backups/{}"
+TEST_DESTINATION_GCS_FOLDER: str = "gs://bucket_name/path_inside_bucket"
+
+BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
+DATAPROC_METASTORE_STRING = "airflow.providers.google.cloud.hooks.dataproc_metastore.{}"
+
+
+class TestDataprocMetastoreWithDefaultProjectIdHook(TestCase):
+ def setUp(self):
+ with mock.patch(
+ BASE_STRING.format("GoogleBaseHook.__init__"), new=mock_base_gcp_hook_default_project_id
+ ):
+ self.hook = DataprocMetastoreHook(gcp_conn_id=TEST_GCP_CONN_ID)
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_create_backup(self, mock_client) -> None:
+ self.hook.create_backup(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ backup=TEST_BACKUP,
+ backup_id=TEST_BACKUP_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.create_backup.assert_called_once_with(
+ request=dict(
+ parent=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ backup=TEST_BACKUP,
+ backup_id=TEST_BACKUP_ID,
+ request_id=None,
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_create_metadata_import(self, mock_client) -> None:
+ self.hook.create_metadata_import(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ metadata_import=TEST_METADATA_IMPORT,
+ metadata_import_id=TEST_METADATA_IMPORT_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.create_metadata_import.assert_called_once_with(
+ request=dict(
+ parent=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ metadata_import=TEST_METADATA_IMPORT,
+ metadata_import_id=TEST_METADATA_IMPORT_ID,
+ request_id=None,
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_create_service(self, mock_client) -> None:
+ self.hook.create_service(
+ region=TEST_REGION,
+ project_id=TEST_PROJECT_ID,
+ service=TEST_SERVICE,
+ service_id=TEST_SERVICE_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.create_service.assert_called_once_with(
+ request=dict(
+ parent=TEST_PARENT.format(TEST_PROJECT_ID, TEST_REGION),
+ service_id=TEST_SERVICE_ID,
+ service=TEST_SERVICE,
+ request_id=None,
+ ),
+ metadata=(),
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_delete_backup(self, mock_client) -> None:
+ self.hook.delete_backup(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ backup_id=TEST_BACKUP_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.delete_backup.assert_called_once_with(
+ request=dict(
+ name=TEST_NAME_BACKUPS.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID, TEST_BACKUP_ID),
+ request_id=None,
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_delete_service(self, mock_client) -> None:
+ self.hook.delete_service(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.delete_service.assert_called_once_with(
+ request=dict(
+ name=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ request_id=None,
+ ),
+ retry=None,
+ timeout=None,
+ metadata=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_export_metadata(self, mock_client) -> None:
+ self.hook.export_metadata(
+ destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.export_metadata.assert_called_once_with(
+ request=dict(
+ destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+ service=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ request_id=None,
+ database_dump_type=None,
+ ),
+ retry=None,
+ timeout=None,
+ metadata=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_get_service(self, mock_client) -> None:
+ self.hook.get_service(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.get_service.assert_called_once_with(
+ request=dict(
+ name=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_list_backups(self, mock_client) -> None:
+ self.hook.list_backups(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.list_backups.assert_called_once_with(
+ request=dict(
+ parent=TEST_PARENT_BACKUPS.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ page_size=None,
+ page_token=None,
+ filter=None,
+ order_by=None,
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_restore_service(self, mock_client) -> None:
+ self.hook.restore_service(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ backup_project_id=TEST_PROJECT_ID,
+ backup_region=TEST_REGION,
+ backup_service_id=TEST_SERVICE_ID,
+ backup_id=TEST_BACKUP_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.restore_service.assert_called_once_with(
+ request=dict(
+ service=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ backup=TEST_NAME_BACKUPS.format(
+ TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID, TEST_BACKUP_ID
+ ),
+ restore_type=None,
+ request_id=None,
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_update_service(self, mock_client) -> None:
+ self.hook.update_service(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ service=TEST_SERVICE_TO_UPDATE,
+ update_mask=TEST_UPDATE_MASK,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.update_service.assert_called_once_with(
+ request=dict(
+ service=TEST_SERVICE_TO_UPDATE,
+ update_mask=TEST_UPDATE_MASK,
+ request_id=None,
+ ),
+ retry=None,
+ timeout=None,
+ metadata=None,
+ )
+
+
+class TestDataprocMetastoreWithoutDefaultProjectIdHook(TestCase):
+ def setUp(self):
+ with mock.patch(
+ BASE_STRING.format("GoogleBaseHook.__init__"), new=mock_base_gcp_hook_no_default_project_id
+ ):
+ self.hook = DataprocMetastoreHook(gcp_conn_id=TEST_GCP_CONN_ID)
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_create_backup(self, mock_client) -> None:
+ self.hook.create_backup(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ backup=TEST_BACKUP,
+ backup_id=TEST_BACKUP_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.create_backup.assert_called_once_with(
+ request=dict(
+ parent=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ backup=TEST_BACKUP,
+ backup_id=TEST_BACKUP_ID,
+ request_id=None,
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_create_metadata_import(self, mock_client) -> None:
+ self.hook.create_metadata_import(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ metadata_import=TEST_METADATA_IMPORT,
+ metadata_import_id=TEST_METADATA_IMPORT_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.create_metadata_import.assert_called_once_with(
+ request=dict(
+ parent=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ metadata_import=TEST_METADATA_IMPORT,
+ metadata_import_id=TEST_METADATA_IMPORT_ID,
+ request_id=None,
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_create_service(self, mock_client) -> None:
+ self.hook.create_service(
+ region=TEST_REGION,
+ project_id=TEST_PROJECT_ID,
+ service=TEST_SERVICE,
+ service_id=TEST_SERVICE_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.create_service.assert_called_once_with(
+ request=dict(
+ parent=TEST_PARENT.format(TEST_PROJECT_ID, TEST_REGION),
+ service_id=TEST_SERVICE_ID,
+ service=TEST_SERVICE,
+ request_id=None,
+ ),
+ metadata=(),
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_delete_backup(self, mock_client) -> None:
+ self.hook.delete_backup(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ backup_id=TEST_BACKUP_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.delete_backup.assert_called_once_with(
+ request=dict(
+ name=TEST_NAME_BACKUPS.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID, TEST_BACKUP_ID),
+ request_id=None,
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_delete_service(self, mock_client) -> None:
+ self.hook.delete_service(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.delete_service.assert_called_once_with(
+ request=dict(
+ name=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ request_id=None,
+ ),
+ retry=None,
+ timeout=None,
+ metadata=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_export_metadata(self, mock_client) -> None:
+ self.hook.export_metadata(
+ destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.export_metadata.assert_called_once_with(
+ request=dict(
+ destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+ service=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ request_id=None,
+ database_dump_type=None,
+ ),
+ retry=None,
+ timeout=None,
+ metadata=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_get_service(self, mock_client) -> None:
+ self.hook.get_service(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.get_service.assert_called_once_with(
+ request=dict(
+ name=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_list_backups(self, mock_client) -> None:
+ self.hook.list_backups(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.list_backups.assert_called_once_with(
+ request=dict(
+ parent=TEST_PARENT_BACKUPS.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ page_size=None,
+ page_token=None,
+ filter=None,
+ order_by=None,
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_restore_service(self, mock_client) -> None:
+ self.hook.restore_service(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ backup_project_id=TEST_PROJECT_ID,
+ backup_region=TEST_REGION,
+ backup_service_id=TEST_SERVICE_ID,
+ backup_id=TEST_BACKUP_ID,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.restore_service.assert_called_once_with(
+ request=dict(
+ service=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+ backup=TEST_NAME_BACKUPS.format(
+ TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID, TEST_BACKUP_ID
+ ),
+ restore_type=None,
+ request_id=None,
+ ),
+ metadata=None,
+ retry=None,
+ timeout=None,
+ )
+
+ @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+ def test_update_service(self, mock_client) -> None:
+ self.hook.update_service(
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ service_id=TEST_SERVICE_ID,
+ service=TEST_SERVICE_TO_UPDATE,
+ update_mask=TEST_UPDATE_MASK,
+ )
+ mock_client.assert_called_once()
+ mock_client.return_value.update_service.assert_called_once_with(
+ request=dict(
+ service=TEST_SERVICE_TO_UPDATE,
+ update_mask=TEST_UPDATE_MASK,
+ request_id=None,
+ ),
+ retry=None,
+ timeout=None,
+ metadata=None,
+ )
diff --git a/tests/providers/google/cloud/operators/test_dataproc_metastore.py b/tests/providers/google/cloud/operators/test_dataproc_metastore.py
new file mode 100644
index 0000000..652b983
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_dataproc_metastore.py
@@ -0,0 +1,396 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase, mock
+
+from google.api_core.retry import Retry
+
+from airflow.providers.google.cloud.operators.dataproc_metastore import (
+ DataprocMetastoreCreateBackupOperator,
+ DataprocMetastoreCreateMetadataImportOperator,
+ DataprocMetastoreCreateServiceOperator,
+ DataprocMetastoreDeleteBackupOperator,
+ DataprocMetastoreDeleteServiceOperator,
+ DataprocMetastoreExportMetadataOperator,
+ DataprocMetastoreGetServiceOperator,
+ DataprocMetastoreListBackupsOperator,
+ DataprocMetastoreRestoreServiceOperator,
+ DataprocMetastoreUpdateServiceOperator,
+)
+
+TASK_ID: str = "task_id"
+GCP_LOCATION: str = "test-location"
+GCP_PROJECT_ID: str = "test-project-id"
+
+GCP_CONN_ID: str = "test-gcp-conn-id"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+TEST_SERVICE: dict = {"name": "test-service"}
+TEST_SERVICE_ID: str = "test-service-id"
+
+TEST_TIMEOUT = 120
+TEST_RETRY = mock.MagicMock(Retry)
+TEST_METADATA = [("key", "value")]
+TEST_REQUEST_ID = "request_id_uuid"
+
+TEST_BACKUP: dict = {"name": "test-backup"}
+TEST_BACKUP_ID: str = "test-backup-id"
+TEST_METADATA_IMPORT: dict = {
+ "name": "test-metadata-import",
+ "database_dump": {
+ "gcs_uri": "gs://bucket_name/path_inside_bucket",
+ "database_type": "MYSQL",
+ },
+}
+TEST_METADATA_IMPORT_ID: str = "test-metadata-import-id"
+TEST_SERVICE_TO_UPDATE = {
+ "labels": {
+ "first_key": "first_value",
+ "second_key": "second_value",
+ }
+}
+TEST_UPDATE_MASK: dict = {"paths": ["labels"]}
+TEST_DESTINATION_GCS_FOLDER: str = "gs://bucket_name/path_inside_bucket"
+
+
+class TestDataprocMetastoreCreateBackupOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.Backup")
+ def test_assert_valid_hook_call(self, mock_backup, mock_hook) -> None:
+ task = DataprocMetastoreCreateBackupOperator(
+ task_id=TASK_ID,
+ project_id=GCP_PROJECT_ID,
+ region=GCP_LOCATION,
+ backup=TEST_BACKUP,
+ backup_id=TEST_BACKUP_ID,
+ service_id=TEST_SERVICE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.wait_for_operation.return_value = None
+ mock_backup.return_value.to_dict.return_value = None
+ task.execute(context=mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+ mock_hook.return_value.create_backup.assert_called_once_with(
+ project_id=GCP_PROJECT_ID,
+ region=GCP_LOCATION,
+ backup=TEST_BACKUP,
+ backup_id=TEST_BACKUP_ID,
+ service_id=TEST_SERVICE_ID,
+ request_id=None,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestDataprocMetastoreCreateMetadataImportOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.MetadataImport")
+ def test_assert_valid_hook_call(self, mock_metadata_import, mock_hook) -> None:
+ task = DataprocMetastoreCreateMetadataImportOperator(
+ task_id=TASK_ID,
+ project_id=GCP_PROJECT_ID,
+ region=GCP_LOCATION,
+ service_id=TEST_SERVICE_ID,
+ metadata_import=TEST_METADATA_IMPORT,
+ metadata_import_id=TEST_METADATA_IMPORT_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.wait_for_operation.return_value = None
+ mock_metadata_import.return_value.to_dict.return_value = None
+ task.execute(context=mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+ mock_hook.return_value.create_metadata_import.assert_called_once_with(
+ project_id=GCP_PROJECT_ID,
+ region=GCP_LOCATION,
+ service_id=TEST_SERVICE_ID,
+ metadata_import=TEST_METADATA_IMPORT,
+ metadata_import_id=TEST_METADATA_IMPORT_ID,
+ request_id=None,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestDataprocMetastoreCreateServiceOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.Service")
+ def test_execute(self, mock_service, mock_hook) -> None:
+ task = DataprocMetastoreCreateServiceOperator(
+ task_id=TASK_ID,
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ service=TEST_SERVICE,
+ service_id=TEST_SERVICE_ID,
+ request_id=TEST_REQUEST_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.wait_for_operation.return_value = None
+ mock_service.return_value.to_dict.return_value = None
+ task.execute(context=mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+ mock_hook.return_value.create_service.assert_called_once_with(
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ service=TEST_SERVICE,
+ service_id=TEST_SERVICE_ID,
+ request_id=TEST_REQUEST_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestDataprocMetastoreDeleteBackupOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+ def test_assert_valid_hook_call(self, mock_hook) -> None:
+ task = DataprocMetastoreDeleteBackupOperator(
+ task_id=TASK_ID,
+ project_id=GCP_PROJECT_ID,
+ region=GCP_LOCATION,
+ retry=TEST_RETRY,
+ service_id=TEST_SERVICE_ID,
+ backup_id=TEST_BACKUP_ID,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.wait_for_operation.return_value = None
+ task.execute(context=mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+ mock_hook.return_value.delete_backup.assert_called_once_with(
+ project_id=GCP_PROJECT_ID,
+ region=GCP_LOCATION,
+ service_id=TEST_SERVICE_ID,
+ backup_id=TEST_BACKUP_ID,
+ request_id=None,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestDataprocMetastoreDeleteServiceOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+ def test_execute(self, mock_hook) -> None:
+ task = DataprocMetastoreDeleteServiceOperator(
+ task_id=TASK_ID,
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ service_id=TEST_SERVICE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.wait_for_operation.return_value = None
+ task.execute(context=mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+ mock_hook.return_value.delete_service.assert_called_once_with(
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ service_id=TEST_SERVICE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestDataprocMetastoreExportMetadataOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.MetadataExport")
+ @mock.patch(
+ "airflow.providers.google.cloud.operators.dataproc_metastore"
+ ".DataprocMetastoreExportMetadataOperator._wait_for_export_metadata"
+ )
+ def test_assert_valid_hook_call(self, mock_wait, mock_export_metadata, mock_hook) -> None:
+ task = DataprocMetastoreExportMetadataOperator(
+ task_id=TASK_ID,
+ service_id=TEST_SERVICE_ID,
+ destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+ project_id=GCP_PROJECT_ID,
+ region=GCP_LOCATION,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_wait.return_value = None
+ mock_export_metadata.return_value.to_dict.return_value = None
+ task.execute(context=mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+ mock_hook.return_value.export_metadata.assert_called_once_with(
+ database_dump_type=None,
+ destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+ project_id=GCP_PROJECT_ID,
+ region=GCP_LOCATION,
+ service_id=TEST_SERVICE_ID,
+ request_id=None,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestDataprocMetastoreGetServiceOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.Service")
+ def test_execute(self, mock_service, mock_hook) -> None:
+ task = DataprocMetastoreGetServiceOperator(
+ task_id=TASK_ID,
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ service_id=TEST_SERVICE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.wait_for_operation.return_value = None
+ mock_service.return_value.to_dict.return_value = None
+ task.execute(context=mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+ mock_hook.return_value.get_service.assert_called_once_with(
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ service_id=TEST_SERVICE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestDataprocMetastoreListBackupsOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.Backup")
+ def test_assert_valid_hook_call(self, mock_backup, mock_hook) -> None:
+ task = DataprocMetastoreListBackupsOperator(
+ task_id=TASK_ID,
+ project_id=GCP_PROJECT_ID,
+ region=GCP_LOCATION,
+ service_id=TEST_SERVICE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_hook.return_value.wait_for_operation.return_value = None
+ mock_backup.return_value.to_dict.return_value = None
+ task.execute(context=mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+ mock_hook.return_value.list_backups.assert_called_once_with(
+ project_id=GCP_PROJECT_ID,
+ region=GCP_LOCATION,
+ service_id=TEST_SERVICE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ filter=None,
+ order_by=None,
+ page_size=None,
+ page_token=None,
+ )
+
+
+class TestDataprocMetastoreRestoreServiceOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+ @mock.patch(
+ "airflow.providers.google.cloud.operators.dataproc_metastore"
+ ".DataprocMetastoreRestoreServiceOperator._wait_for_restore_service"
+ )
+ def test_assert_valid_hook_call(self, mock_wait, mock_hook) -> None:
+ task = DataprocMetastoreRestoreServiceOperator(
+ task_id=TASK_ID,
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ service_id=TEST_SERVICE_ID,
+ backup_id=TEST_BACKUP_ID,
+ backup_region=GCP_LOCATION,
+ backup_project_id=GCP_PROJECT_ID,
+ backup_service_id=TEST_SERVICE_ID,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ mock_wait.return_value = None
+ task.execute(context=mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+ mock_hook.return_value.restore_service.assert_called_once_with(
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ service_id=TEST_SERVICE_ID,
+ backup_id=TEST_BACKUP_ID,
+ backup_region=GCP_LOCATION,
+ backup_project_id=GCP_PROJECT_ID,
+ backup_service_id=TEST_SERVICE_ID,
+ restore_type=None,
+ request_id=None,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
+
+
+class TestDataprocMetastoreUpdateServiceOperator(TestCase):
+ @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+ def test_assert_valid_hook_call(self, mock_hook) -> None:
+ task = DataprocMetastoreUpdateServiceOperator(
+ task_id=TASK_ID,
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ service_id=TEST_SERVICE_ID,
+ service=TEST_SERVICE_TO_UPDATE,
+ update_mask=TEST_UPDATE_MASK,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ task.execute(context=mock.MagicMock())
+ mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+ mock_hook.return_value.update_service.assert_called_once_with(
+ region=GCP_LOCATION,
+ project_id=GCP_PROJECT_ID,
+ service_id=TEST_SERVICE_ID,
+ service=TEST_SERVICE_TO_UPDATE,
+ update_mask=TEST_UPDATE_MASK,
+ request_id=None,
+ retry=TEST_RETRY,
+ timeout=TEST_TIMEOUT,
+ metadata=TEST_METADATA,
+ )
diff --git a/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py b/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py
new file mode 100644
index 0000000..3c1ad88
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from airflow.providers.google.cloud.example_dags.example_dataproc_metastore import BUCKET
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_DATAPROC_KEY
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_DATAPROC_KEY)
+class DataprocMetastoreExampleDagsTest(GoogleSystemTest):
+ @provide_gcp_context(GCP_DATAPROC_KEY)
+ def setUp(self):
+ super().setUp()
+ self.create_gcs_bucket(BUCKET)
+
+ @provide_gcp_context(GCP_DATAPROC_KEY)
+ def tearDown(self):
+ self.delete_gcs_bucket(BUCKET)
+ super().tearDown()
+
+ @provide_gcp_context(GCP_DATAPROC_KEY)
+ def test_run_example_dag(self):
+ self.run_dag(dag_id="example_gcp_dataproc_metastore", dag_folder=CLOUD_DAG_FOLDER)