You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/11/07 23:37:43 UTC

[airflow] branch main updated: Add dataproc metastore operators (#18945)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 26ad55b  Add dataproc metastore operators (#18945)
26ad55b is described below

commit 26ad55beb00f5a0915ba4bec541e3d67044834e9
Author: Wojciech Januszek <wj...@sigma.ug.edu.pl>
AuthorDate: Mon Nov 8 00:37:21 2021 +0100

    Add dataproc metastore operators (#18945)
---
 .../example_dags/example_dataproc_metastore.py     |  216 ++++
 .../google/cloud/hooks/dataproc_metastore.py       |  676 +++++++++++++
 .../google/cloud/operators/dataproc_metastore.py   | 1068 ++++++++++++++++++++
 airflow/providers/google/provider.yaml             |   11 +
 .../operators/cloud/dataproc_metastore.rst         |  196 ++++
 setup.py                                           |    1 +
 .../google/cloud/hooks/test_dataproc_metastore.py  |  489 +++++++++
 .../cloud/operators/test_dataproc_metastore.py     |  396 ++++++++
 .../operators/test_dataproc_metastore_system.py    |   40 +
 9 files changed, 3093 insertions(+)

diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py b/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
new file mode 100644
index 0000000..563a044
--- /dev/null
+++ b/airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
@@ -0,0 +1,216 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG that show how to use various Dataproc Metastore
+operators to manage a service.
+"""
+
+import datetime
+import os
+
+from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.dataproc_metastore import (
+    DataprocMetastoreCreateBackupOperator,
+    DataprocMetastoreCreateMetadataImportOperator,
+    DataprocMetastoreCreateServiceOperator,
+    DataprocMetastoreDeleteBackupOperator,
+    DataprocMetastoreDeleteServiceOperator,
+    DataprocMetastoreExportMetadataOperator,
+    DataprocMetastoreGetServiceOperator,
+    DataprocMetastoreListBackupsOperator,
+    DataprocMetastoreRestoreServiceOperator,
+    DataprocMetastoreUpdateServiceOperator,
+)
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<PROJECT_ID>")
+SERVICE_ID = os.environ.get("GCP_DATAPROC_METASTORE_SERVICE_ID", "dataproc-metastore-system-tests-service-1")
+BACKUP_ID = os.environ.get("GCP_DATAPROC_METASTORE_BACKUP_ID", "dataproc-metastore-system-tests-backup-1")
+REGION = os.environ.get("GCP_REGION", "<REGION>")
+BUCKET = os.environ.get("GCP_DATAPROC_METASTORE_BUCKET", "INVALID BUCKET NAME")
+METADATA_IMPORT_FILE = os.environ.get("GCS_METADATA_IMPORT_FILE", None)
+GCS_URI = os.environ.get("GCS_URI", f"gs://{BUCKET}/data/hive.sql")
+METADATA_IMPORT_ID = "dataproc-metastore-system-tests-metadata-import-1"
+TIMEOUT = 1200
+DB_TYPE = "MYSQL"
+DESTINATION_GCS_FOLDER = f"gs://{BUCKET}/>"
+
+# Service definition
+# Docs: https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services#Service
+# [START how_to_cloud_dataproc_metastore_create_service]
+SERVICE = {
+    "name": "test-service",
+}
+# [END how_to_cloud_dataproc_metastore_create_service]
+
+# Update service
+# [START how_to_cloud_dataproc_metastore_update_service]
+SERVICE_TO_UPDATE = {
+    "labels": {
+        "mylocalmachine": "mylocalmachine",
+        "systemtest": "systemtest",
+    }
+}
+UPDATE_MASK = {"paths": ["labels"]}
+# [END how_to_cloud_dataproc_metastore_update_service]
+
+# Backup definition
+# [START how_to_cloud_dataproc_metastore_create_backup]
+BACKUP = {
+    "name": "test-backup",
+}
+# [END how_to_cloud_dataproc_metastore_create_backup]
+
+# Metadata import definition
+# [START how_to_cloud_dataproc_metastore_create_metadata_import]
+METADATA_IMPORT = {
+    "name": "test-metadata-import",
+    "database_dump": {
+        "gcs_uri": GCS_URI,
+        "database_type": DB_TYPE,
+    },
+}
+# [END how_to_cloud_dataproc_metastore_create_metadata_import]
+
+
+with models.DAG(
+    "example_gcp_dataproc_metastore", start_date=datetime.datetime(2021, 1, 1), schedule_interval="@once"
+) as dag:
+    # [START how_to_cloud_dataproc_metastore_create_service_operator]
+    create_service = DataprocMetastoreCreateServiceOperator(
+        task_id="create_service",
+        region=REGION,
+        project_id=PROJECT_ID,
+        service=SERVICE,
+        service_id=SERVICE_ID,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_create_service_operator]
+
+    # [START how_to_cloud_dataproc_metastore_get_service_operator]
+    get_service_details = DataprocMetastoreGetServiceOperator(
+        task_id="get_service",
+        region=REGION,
+        project_id=PROJECT_ID,
+        service_id=SERVICE_ID,
+    )
+    # [END how_to_cloud_dataproc_metastore_get_service_operator]
+
+    # [START how_to_cloud_dataproc_metastore_update_service_operator]
+    update_service = DataprocMetastoreUpdateServiceOperator(
+        task_id="update_service",
+        project_id=PROJECT_ID,
+        service_id=SERVICE_ID,
+        region=REGION,
+        service=SERVICE_TO_UPDATE,
+        update_mask=UPDATE_MASK,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_update_service_operator]
+
+    # [START how_to_cloud_dataproc_metastore_create_metadata_import_operator]
+    import_metadata = DataprocMetastoreCreateMetadataImportOperator(
+        task_id="create_metadata_import",
+        project_id=PROJECT_ID,
+        region=REGION,
+        service_id=SERVICE_ID,
+        metadata_import=METADATA_IMPORT,
+        metadata_import_id=METADATA_IMPORT_ID,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_create_metadata_import_operator]
+
+    # [START how_to_cloud_dataproc_metastore_export_metadata_operator]
+    export_metadata = DataprocMetastoreExportMetadataOperator(
+        task_id="export_metadata",
+        destination_gcs_folder=DESTINATION_GCS_FOLDER,
+        project_id=PROJECT_ID,
+        region=REGION,
+        service_id=SERVICE_ID,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_export_metadata_operator]
+
+    # [START how_to_cloud_dataproc_metastore_create_backup_operator]
+    backup_service = DataprocMetastoreCreateBackupOperator(
+        task_id="create_backup",
+        project_id=PROJECT_ID,
+        region=REGION,
+        service_id=SERVICE_ID,
+        backup=BACKUP,
+        backup_id=BACKUP_ID,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_create_backup_operator]
+
+    # [START how_to_cloud_dataproc_metastore_list_backups_operator]
+    list_backups = DataprocMetastoreListBackupsOperator(
+        task_id="list_backups",
+        project_id=PROJECT_ID,
+        region=REGION,
+        service_id=SERVICE_ID,
+    )
+    # [END how_to_cloud_dataproc_metastore_list_backups_operator]
+
+    # [START how_to_cloud_dataproc_metastore_delete_backup_operator]
+    delete_backup = DataprocMetastoreDeleteBackupOperator(
+        task_id="delete_backup",
+        project_id=PROJECT_ID,
+        region=REGION,
+        service_id=SERVICE_ID,
+        backup_id=BACKUP_ID,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_delete_backup_operator]
+
+    # [START how_to_cloud_dataproc_metastore_restore_service_operator]
+    restore_service = DataprocMetastoreRestoreServiceOperator(
+        task_id="restore_metastore",
+        region=REGION,
+        project_id=PROJECT_ID,
+        service_id=SERVICE_ID,
+        backup_id=BACKUP_ID,
+        backup_region=REGION,
+        backup_project_id=PROJECT_ID,
+        backup_service_id=SERVICE_ID,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_restore_service_operator]
+
+    # [START how_to_cloud_dataproc_metastore_delete_service_operator]
+    delete_service = DataprocMetastoreDeleteServiceOperator(
+        task_id="delete_service",
+        region=REGION,
+        project_id=PROJECT_ID,
+        service_id=SERVICE_ID,
+        timeout=TIMEOUT,
+    )
+    # [END how_to_cloud_dataproc_metastore_delete_service_operator]
+
+    chain(
+        create_service,
+        update_service,
+        get_service_details,
+        backup_service,
+        list_backups,
+        restore_service,
+        delete_backup,
+        export_metadata,
+        import_metadata,
+        delete_service,
+    )
diff --git a/airflow/providers/google/cloud/hooks/dataproc_metastore.py b/airflow/providers/google/cloud/hooks/dataproc_metastore.py
new file mode 100644
index 0000000..7a645ff
--- /dev/null
+++ b/airflow/providers/google/cloud/hooks/dataproc_metastore.py
@@ -0,0 +1,676 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains a Google Cloud Dataproc Metastore hook."""
+
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+from google.api_core.operation import Operation
+from google.api_core.retry import Retry
+from google.cloud.metastore_v1 import DataprocMetastoreClient
+from google.cloud.metastore_v1.types import Backup, MetadataImport, Service
+from google.cloud.metastore_v1.types.metastore import DatabaseDumpSpec, Restore
+from google.protobuf.field_mask_pb2 import FieldMask
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+
+class DataprocMetastoreHook(GoogleBaseHook):
+    """Hook for Google Cloud Dataproc Metastore APIs."""
+
+    def get_dataproc_metastore_client(self) -> DataprocMetastoreClient:
+        """Returns DataprocMetastoreClient."""
+        client_options = {'api_endpoint': 'metastore.googleapis.com:443'}
+
+        return DataprocMetastoreClient(
+            credentials=self._get_credentials(), client_info=self.client_info, client_options=client_options
+        )
+
+    def wait_for_operation(self, timeout: float, operation: Operation):
+        """Waits for long-lasting operation to complete."""
+        try:
+            return operation.result(timeout=timeout)
+        except Exception:
+            error = operation.exception(timeout=timeout)
+            raise AirflowException(error)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_backup(
+        self,
+        project_id: str,
+        region: str,
+        service_id: str,
+        backup: Backup,
+        backup_id: str,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Creates a new backup in a given project and location.
+
+        :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+        :type project_id: str
+        :param region: Required. The ID of the Google Cloud region that the service belongs to.
+        :type region: str
+        :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+            the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+            with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type service_id: str
+        :param backup:  Required. The backup to create. The ``name`` field is ignored. The ID of the created
+            backup must be provided in the request's ``backup_id`` field.
+
+            This corresponds to the ``backup`` field on the ``request`` instance; if ``request`` is provided,
+            this should not be set.
+        :type backup: google.cloud.metastore_v1.types.Backup
+        :param backup_id:  Required. The ID of the backup, which is used as the final component of the
+            backup's name. This value must be between 1 and 64 characters long, begin with a letter, end with
+            a letter or number, and consist of alphanumeric ASCII characters or hyphens.
+
+            This corresponds to the ``backup_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type backup_id: str
+        :param request_id: Optional. A unique id used to identify the request.
+        :type request_id: str
+        :param retry: Designation of what errors, if any, should be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The timeout for this request.
+        :type timeout: float
+        :param metadata: Strings which should be sent along with the request as metadata.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        parent = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+        client = self.get_dataproc_metastore_client()
+        result = client.create_backup(
+            request={
+                'parent': parent,
+                'backup': backup,
+                'backup_id': backup_id,
+                'request_id': request_id,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_metadata_import(
+        self,
+        project_id: str,
+        region: str,
+        service_id: str,
+        metadata_import: MetadataImport,
+        metadata_import_id: str,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Creates a new MetadataImport in a given project and location.
+
+        :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+        :type project_id: str
+        :param region: Required. The ID of the Google Cloud region that the service belongs to.
+        :type region: str
+        :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+            the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+            with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type service_id: str
+        :param metadata_import:  Required. The metadata import to create. The ``name`` field is ignored. The
+            ID of the created metadata import must be provided in the request's ``metadata_import_id`` field.
+
+            This corresponds to the ``metadata_import`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type metadata_import: google.cloud.metastore_v1.types.MetadataImport
+        :param metadata_import_id:  Required. The ID of the metadata import, which is used as the final
+            component of the metadata import's name. This value must be between 1 and 64 characters long,
+            begin with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``metadata_import_id`` field on the ``request`` instance; if ``request``
+            is provided, this should not be set.
+        :type metadata_import_id: str
+        :param request_id: Optional. A unique id used to identify the request.
+        :type request_id: str
+        :param retry: Designation of what errors, if any, should be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The timeout for this request.
+        :type timeout: float
+        :param metadata: Strings which should be sent along with the request as metadata.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        parent = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+        client = self.get_dataproc_metastore_client()
+        result = client.create_metadata_import(
+            request={
+                'parent': parent,
+                'metadata_import': metadata_import,
+                'metadata_import_id': metadata_import_id,
+                'request_id': request_id,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_service(
+        self,
+        region: str,
+        project_id: str,
+        service: Union[Dict, Service],
+        service_id: str,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+    ):
+        """
+        Creates a metastore service in a project and location.
+
+        :param region: Required. The ID of the Google Cloud region that the service belongs to.
+        :type region: str
+        :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+        :type project_id: str
+        :param service:  Required. The Metastore service to create. The ``name`` field is ignored. The ID of
+            the created metastore service must be provided in the request's ``service_id`` field.
+
+            This corresponds to the ``service`` field on the ``request`` instance; if ``request`` is provided,
+            this should not be set.
+        :type service: google.cloud.metastore_v1.types.Service
+        :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+            the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+            with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type service_id: str
+        :param request_id: Optional. A unique id used to identify the request.
+        :type request_id: str
+        :param retry: Designation of what errors, if any, should be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The timeout for this request.
+        :type timeout: float
+        :param metadata: Strings which should be sent along with the request as metadata.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        parent = f'projects/{project_id}/locations/{region}'
+
+        client = self.get_dataproc_metastore_client()
+        result = client.create_service(
+            request={
+                'parent': parent,
+                'service_id': service_id,
+                'service': service if service else {},
+                'request_id': request_id,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_backup(
+        self,
+        project_id: str,
+        region: str,
+        service_id: str,
+        backup_id: str,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Deletes a single backup.
+
+        :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+        :type project_id: str
+        :param region: Required. The ID of the Google Cloud region that the service belongs to.
+        :type region: str
+        :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+            the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+            with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type service_id: str
+        :param backup_id:  Required. The ID of the backup, which is used as the final component of the
+            backup's name. This value must be between 1 and 64 characters long, begin with a letter, end with
+            a letter or number, and consist of alphanumeric ASCII characters or hyphens.
+
+            This corresponds to the ``backup_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type backup_id: str
+        :param request_id: Optional. A unique id used to identify the request.
+        :type request_id: str
+        :param retry: Designation of what errors, if any, should be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The timeout for this request.
+        :type timeout: float
+        :param metadata: Strings which should be sent along with the request as metadata.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        name = f'projects/{project_id}/locations/{region}/services/{service_id}/backups/{backup_id}'
+
+        client = self.get_dataproc_metastore_client()
+        result = client.delete_backup(
+            request={
+                'name': name,
+                'request_id': request_id,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_service(
+        self,
+        project_id: str,
+        region: str,
+        service_id: str,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Deletes a single service.
+
+        :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+        :type project_id: str
+        :param region: Required. The ID of the Google Cloud region that the service belongs to.
+        :type region: str
+        :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+            the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+            with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type service_id: str
+        :param request_id: Optional. A unique id used to identify the request.
+        :type request_id: str
+        :param retry: Designation of what errors, if any, should be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The timeout for this request.
+        :type timeout: float
+        :param metadata: Strings which should be sent along with the request as metadata.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        name = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+        client = self.get_dataproc_metastore_client()
+        result = client.delete_service(
+            request={
+                'name': name,
+                'request_id': request_id,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def export_metadata(
+        self,
+        destination_gcs_folder: str,
+        project_id: str,
+        region: str,
+        service_id: str,
+        request_id: Optional[str] = None,
+        database_dump_type: Optional[DatabaseDumpSpec] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Exports metadata from a service.
+
+        :param destination_gcs_folder: A Cloud Storage URI of a folder, in the format
+            ``gs://<bucket_name>/<path_inside_bucket>``. A sub-folder
+            ``<export_folder>`` containing exported files will be
+            created below it.
+        :type destination_gcs_folder: str
+        :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+        :type project_id: str
+        :param region: Required. The ID of the Google Cloud region that the service belongs to.
+        :type region: str
+        :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+            the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+            with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type service_id: str
+        :param request_id: Optional. A unique id used to identify the request.
+        :type request_id: str
+        :param database_dump_type: Optional. The type of the database dump. If unspecified,
+            defaults to ``MYSQL``.
+        :type database_dump_type: google.cloud.metastore_v1.types.DatabaseDumpSpec.Type
+        :param retry: Designation of what errors, if any, should be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The timeout for this request.
+        :type timeout: float
+        :param metadata: Strings which should be sent along with the request as metadata.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        service = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+        client = self.get_dataproc_metastore_client()
+        result = client.export_metadata(
+            request={
+                'destination_gcs_folder': destination_gcs_folder,
+                'service': service,
+                'request_id': request_id,
+                'database_dump_type': database_dump_type,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_service(
+        self,
+        project_id: str,
+        region: str,
+        service_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Gets the details of a single service.
+
+        :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+        :type project_id: str
+        :param region: Required. The ID of the Google Cloud region that the service belongs to.
+        :type region: str
+        :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+            the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+            with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type service_id: str
+        :param retry: Designation of what errors, if any, should be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The timeout for this request.
+        :type timeout: float
+        :param metadata: Strings which should be sent along with the request as metadata.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        name = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+        client = self.get_dataproc_metastore_client()
+        result = client.get_service(
+            request={
+                'name': name,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_backups(
+        self,
+        project_id: str,
+        region: str,
+        service_id: str,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        filter: Optional[str] = None,
+        order_by: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Lists backups in a service.
+
+        :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+        :type project_id: str
+        :param region: Required. The ID of the Google Cloud region that the service belongs to.
+        :type region: str
+        :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+            the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+            with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type service_id: str
+        :param page_size: Optional. The maximum number of backups to
+            return. The response may contain less than the
+            maximum number. If unspecified, no more than 500
+            backups are returned. The maximum value is 1000;
+            values above 1000 are changed to 1000.
+        :type page_size: int
+        :param page_token: Optional. A page token, received from a previous
+            [DataprocMetastore.ListBackups][google.cloud.metastore.v1.DataprocMetastore.ListBackups]
+            call. Provide this token to retrieve the subsequent page.
+            To retrieve the first page, supply an empty page token.
+            When paginating, other parameters provided to
+            [DataprocMetastore.ListBackups][google.cloud.metastore.v1.DataprocMetastore.ListBackups]
+            must match the call that provided the page token.
+        :type page_token: str
+        :param filter: Optional. The filter to apply to list
+            results.
+        :type filter: str
+        :param order_by: Optional. Specify the ordering of results as described in
+            `Sorting
+            Order <https://cloud.google.com/apis/design/design_patterns#sorting_order>`__.
+            If not specified, the results will be sorted in the default
+            order.
+        :type order_by: str
+        :param retry: Designation of what errors, if any, should be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The timeout for this request.
+        :type timeout: float
+        :param metadata: Strings which should be sent along with the request as metadata.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        parent = f'projects/{project_id}/locations/{region}/services/{service_id}/backups'
+
+        client = self.get_dataproc_metastore_client()
+        result = client.list_backups(
+            request={
+                'parent': parent,
+                'page_size': page_size,
+                'page_token': page_token,
+                'filter': filter,
+                'order_by': order_by,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def restore_service(
+        self,
+        project_id: str,
+        region: str,
+        service_id: str,
+        backup_project_id: str,
+        backup_region: str,
+        backup_service_id: str,
+        backup_id: str,
+        restore_type: Optional[Restore] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Restores a service from a backup.
+
+        :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+        :type project_id: str
+        :param region: Required. The ID of the Google Cloud region that the service belongs to.
+        :type region: str
+        :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+            the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+            with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type service_id: str
+        :param backup_project_id: Required. The ID of the Google Cloud project that the metastore service
+            backup to restore from.
+        :type backup_project_id: str
+        :param backup_region: Required. The ID of the Google Cloud region that the metastore
+            service backup to restore from.
+        :type backup_region: str
+        :param backup_service_id:  Required. The ID of the metastore service backup to restore from,
+            which is used as the final component of the metastore service's name. This value must be
+            between 2 and 63 characters long inclusive, begin with a letter, end with a letter or number,
+            and consist of alphanumeric ASCII characters or hyphens.
+        :type backup_service_id: str
+        :param backup_id:  Required. The ID of the metastore service backup to restore from
+        :type backup_id: str
+        :param restore_type: Optional. The type of restore. If unspecified, defaults to
+            ``METADATA_ONLY``
+        :type restore_type: google.cloud.metastore_v1.types.Restore.RestoreType
+        :param request_id: Optional. A unique id used to identify the request.
+        :type request_id: str
+        :param retry: Designation of what errors, if any, should be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The timeout for this request.
+        :type timeout: float
+        :param metadata: Strings which should be sent along with the request as metadata.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        service = f'projects/{project_id}/locations/{region}/services/{service_id}'
+        backup = (
+            f'projects/{backup_project_id}/locations/{backup_region}/services/'
+            f'{backup_service_id}/backups/{backup_id}'
+        )
+
+        client = self.get_dataproc_metastore_client()
+        result = client.restore_service(
+            request={
+                'service': service,
+                'backup': backup,
+                'restore_type': restore_type,
+                'request_id': request_id,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_service(
+        self,
+        project_id: str,
+        region: str,
+        service_id: str,
+        service: Union[Dict, Service],
+        update_mask: FieldMask,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = None,
+    ):
+        """
+        Updates the parameters of a single service.
+
+        :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+        :type project_id: str
+        :param region: Required. The ID of the Google Cloud region that the service belongs to.
+        :type region: str
+        :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+            the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+            with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+            hyphens.
+
+            This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type service_id: str
+        :param service:  Required. The metastore service to update. The server only merges fields in the
+            service if they are specified in ``update_mask``.
+
+            The metastore service's ``name`` field is used to identify the metastore service to be updated.
+
+            This corresponds to the ``service`` field on the ``request`` instance; if ``request`` is provided,
+            this should not be set.
+        :type service: Union[Dict, google.cloud.metastore_v1.types.Service]
+        :param update_mask:  Required. A field mask used to specify the fields to be overwritten in the
+            metastore service resource by the update. Fields specified in the ``update_mask`` are relative to
+            the resource (not to the full request). A field is overwritten if it is in the mask.
+
+            This corresponds to the ``update_mask`` field on the ``request`` instance; if ``request`` is
+            provided, this should not be set.
+        :type update_mask: google.protobuf.field_mask_pb2.FieldMask
+        :param request_id: Optional. A unique id used to identify the request.
+        :type request_id: str
+        :param retry: Designation of what errors, if any, should be retried.
+        :type retry: google.api_core.retry.Retry
+        :param timeout: The timeout for this request.
+        :type timeout: float
+        :param metadata: Strings which should be sent along with the request as metadata.
+        :type metadata: Sequence[Tuple[str, str]]
+        """
+        client = self.get_dataproc_metastore_client()
+
+        service_name = f'projects/{project_id}/locations/{region}/services/{service_id}'
+
+        service["name"] = service_name
+
+        result = client.update_service(
+            request={
+                'service': service,
+                'update_mask': update_mask,
+                'request_id': request_id,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
diff --git a/airflow/providers/google/cloud/operators/dataproc_metastore.py b/airflow/providers/google/cloud/operators/dataproc_metastore.py
new file mode 100644
index 0000000..2823b72
--- /dev/null
+++ b/airflow/providers/google/cloud/operators/dataproc_metastore.py
@@ -0,0 +1,1068 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+"""This module contains Google Dataproc Metastore operators."""
+
+from time import sleep
+from typing import Dict, Optional, Sequence, Tuple, Union
+
+from google.api_core.retry import Retry, exponential_sleep_generator
+from google.cloud.metastore_v1 import MetadataExport, MetadataManagementActivity
+from google.cloud.metastore_v1.types import Backup, MetadataImport, Service
+from google.cloud.metastore_v1.types.metastore import DatabaseDumpSpec, Restore
+from google.protobuf.field_mask_pb2 import FieldMask
+from googleapiclient.errors import HttpError
+
+from airflow import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook
+
+
+class DataprocMetastoreCreateBackupOperator(BaseOperator):
+    """
+    Creates a new backup in a given project and location.
+
+    :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+    :type project_id: str
+    :param region: Required. The ID of the Google Cloud region that the service belongs to.
+    :type region: str
+    :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+        the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+        with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+        hyphens.
+
+        This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type service_id: str
+    :param backup:  Required. The backup to create. The ``name`` field is ignored. The ID of the created
+        backup must be provided in the request's ``backup_id`` field.
+
+        This corresponds to the ``backup`` field on the ``request`` instance; if ``request`` is provided, this
+        should not be set.
+    :type backup: google.cloud.metastore_v1.types.Backup
+    :param backup_id:  Required. The ID of the backup, which is used as the final component of the backup's
+        name. This value must be between 1 and 64 characters long, begin with a letter, end with a letter or
+        number, and consist of alphanumeric ASCII characters or hyphens.
+
+        This corresponds to the ``backup_id`` field on the ``request`` instance; if ``request`` is provided,
+        this should not be set.
+    :type backup_id: str
+    :param request_id: Optional. A unique id used to identify the request.
+    :type request_id: str
+    :param retry: Optional. Designation of what errors, if any, should be retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: Optional. The timeout for this request.
+    :type timeout: float
+    :param metadata: Optional. Strings which should be sent along with the request as metadata.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'backup',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {'backup': 'json'}
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        service_id: str,
+        backup: Union[Dict, Backup],
+        backup_id: str,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.service_id = service_id
+        self.backup = backup
+        self.backup_id = backup_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: dict) -> dict:
+        hook = DataprocMetastoreHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        self.log.info("Creating Dataproc Metastore backup: %s", self.backup_id)
+
+        try:
+            operation = hook.create_backup(
+                project_id=self.project_id,
+                region=self.region,
+                service_id=self.service_id,
+                backup=self.backup,
+                backup_id=self.backup_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            backup = hook.wait_for_operation(self.timeout, operation)
+            self.log.info("Backup %s created successfully", self.backup_id)
+        except HttpError as err:
+            if err.resp.status not in (409, '409'):
+                raise
+            self.log.info("Backup %s already exists", self.backup_id)
+            backup = hook.get_backup(
+                project_id=self.project_id,
+                region=self.region,
+                service_id=self.service_id,
+                backup_id=self.backup_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+        return Backup.to_dict(backup)
+
+
+class DataprocMetastoreCreateMetadataImportOperator(BaseOperator):
+    """
+    Creates a new MetadataImport in a given project and location.
+
+    :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+    :type project_id: str
+    :param region: Required. The ID of the Google Cloud region that the service belongs to.
+    :type region: str
+    :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+        the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+        with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+        hyphens.
+
+        This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type service_id: str
+    :param metadata_import:  Required. The metadata import to create. The ``name`` field is ignored. The ID of
+        the created metadata import must be provided in the request's ``metadata_import_id`` field.
+
+        This corresponds to the ``metadata_import`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type metadata_import: google.cloud.metastore_v1.types.MetadataImport
+    :param metadata_import_id:  Required. The ID of the metadata import, which is used as the final component
+        of the metadata import's name. This value must be between 1 and 64 characters long, begin with a
+        letter, end with a letter or number, and consist of alphanumeric ASCII characters or hyphens.
+
+        This corresponds to the ``metadata_import_id`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type metadata_import_id: str
+    :param request_id: Optional. A unique id used to identify the request.
+    :type request_id: str
+    :param retry: Optional. Designation of what errors, if any, should be retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: Optional. The timeout for this request.
+    :type timeout: float
+    :param metadata: Optional. Strings which should be sent along with the request as metadata.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'metadata_import',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {'metadata_import': 'json'}
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        service_id: str,
+        metadata_import: MetadataImport,
+        metadata_import_id: str,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.service_id = service_id
+        self.metadata_import = metadata_import
+        self.metadata_import_id = metadata_import_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: dict):
+        hook = DataprocMetastoreHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        self.log.info("Creating Dataproc Metastore metadata import: %s", self.metadata_import_id)
+        operation = hook.create_metadata_import(
+            project_id=self.project_id,
+            region=self.region,
+            service_id=self.service_id,
+            metadata_import=self.metadata_import,
+            metadata_import_id=self.metadata_import_id,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        metadata_import = hook.wait_for_operation(self.timeout, operation)
+        self.log.info("Metadata import %s created successfully", self.metadata_import_id)
+        return MetadataImport.to_dict(metadata_import)
+
+
+class DataprocMetastoreCreateServiceOperator(BaseOperator):
+    """
+    Creates a metastore service in a project and location.
+
+    :param region: Required. The ID of the Google Cloud region that the service belongs to.
+    :type region: str
+    :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+    :type project_id: str
+    :param service:  Required. The Metastore service to create. The ``name`` field is ignored. The ID of
+        the created metastore service must be provided in the request's ``service_id`` field.
+
+        This corresponds to the ``service`` field on the ``request`` instance; if ``request`` is provided,
+        this should not be set.
+    :type service: google.cloud.metastore_v1.types.Service
+    :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+        the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+        with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+        hyphens.
+
+        This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type service_id: str
+    :param request_id: Optional. A unique id used to identify the request.
+    :type request_id: str
+    :param retry: Designation of what errors, if any, should be retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The timeout for this request.
+    :type timeout: float
+    :param metadata: Strings which should be sent along with the request as metadata.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'service',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {'service': 'json'}
+
+    def __init__(
+        self,
+        *,
+        region: str,
+        project_id: str,
+        service: Optional[Union[Dict, Service]] = None,
+        service_id: str,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.service = service
+        self.service_id = service_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context) -> dict:
+        hook = DataprocMetastoreHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        self.log.info("Creating Dataproc Metastore service: %s", self.project_id)
+        try:
+            operation = hook.create_service(
+                region=self.region,
+                project_id=self.project_id,
+                service=self.service,
+                service_id=self.service_id,
+                request_id=self.request_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            service = hook.wait_for_operation(self.timeout, operation)
+            self.log.info("Service %s created successfully", self.service_id)
+        except HttpError as err:
+            if err.resp.status not in (409, '409'):
+                raise
+            self.log.info("Instance %s already exists", self.service_id)
+            service = hook.get_service(
+                region=self.region,
+                project_id=self.project_id,
+                service_id=self.service_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+        return Service.to_dict(service)
+
+
+class DataprocMetastoreDeleteBackupOperator(BaseOperator):
+    """
+    Deletes a single backup.
+
+    :param project_id: Required. The ID of the Google Cloud project that the backup belongs to.
+    :type project_id: str
+    :param region: Required. The ID of the Google Cloud region that the backup belongs to.
+    :type region: str
+    :param service_id: Required. The ID of the metastore service, which is used as the final component of
+        the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+        with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+        hyphens.
+
+        This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type service_id: str
+    :param backup_id:  Required. The ID of the backup, which is used as the final component of the backup's
+        name. This value must be between 1 and 64 characters long, begin with a letter, end with a letter or
+        number, and consist of alphanumeric ASCII characters or hyphens.
+
+        This corresponds to the ``backup_id`` field on the ``request`` instance; if ``request`` is provided,
+        this should not be set.
+    :type backup_id: str
+    :param request_id: Optional. A unique id used to identify the request.
+    :type request_id: str
+    :param retry: Optional. Designation of what errors, if any, should be retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: Optional. The timeout for this request.
+    :type timeout: float
+    :param metadata: Optional. Strings which should be sent along with the request as metadata.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        service_id: str,
+        backup_id: str,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.service_id = service_id
+        self.backup_id = backup_id
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: dict) -> None:
+        hook = DataprocMetastoreHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        self.log.info("Deleting Dataproc Metastore backup: %s", self.backup_id)
+        operation = hook.delete_backup(
+            project_id=self.project_id,
+            region=self.region,
+            service_id=self.service_id,
+            backup_id=self.backup_id,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        hook.wait_for_operation(self.timeout, operation)
+        self.log.info("Backup %s deleted successfully", self.project_id)
+
+
+class DataprocMetastoreDeleteServiceOperator(BaseOperator):
+    """
+    Deletes a single service.
+
+    :param request:  The request object. Request message for
+        [DataprocMetastore.DeleteService][google.cloud.metastore.v1.DataprocMetastore.DeleteService].
+    :type request: google.cloud.metastore_v1.types.DeleteServiceRequest
+    :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+    :type project_id: str
+    :param retry: Designation of what errors, if any, should be retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The timeout for this request.
+    :type timeout: float
+    :param metadata: Strings which should be sent along with the request as metadata.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id:
+    :type gcp_conn_id: str
+    """
+
+    template_fields = (
+        'project_id',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str,
+        project_id: str,
+        service_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.service_id = service_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context) -> dict:
+        hook = DataprocMetastoreHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        self.log.info("Deleting Dataproc Metastore service: %s", self.project_id)
+        operation = hook.delete_service(
+            region=self.region,
+            project_id=self.project_id,
+            service_id=self.service_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        hook.wait_for_operation(self.timeout, operation)
+        self.log.info("Service %s deleted successfully", self.project_id)
+
+
+class DataprocMetastoreExportMetadataOperator(BaseOperator):
+    """
+    Exports metadata from a service.
+
+    :param destination_gcs_folder: A Cloud Storage URI of a folder, in the format
+        ``gs://<bucket_name>/<path_inside_bucket>``. A sub-folder
+        ``<export_folder>`` containing exported files will be
+        created below it.
+    :type destination_gcs_folder: str
+    :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+    :type project_id: str
+    :param region: Required. The ID of the Google Cloud region that the service belongs to.
+    :type region: str
+    :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+        the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+        with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+        hyphens.
+        This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type service_id: str
+    :param request_id: Optional. A unique id used to identify the request.
+    :type request_id: str
+    :param retry: Optional. Designation of what errors, if any, should be retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: Optional. The timeout for this request.
+    :type timeout: float
+    :param metadata: Optional. Strings which should be sent along with the request as metadata.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        destination_gcs_folder: str,
+        project_id: str,
+        region: str,
+        service_id: str,
+        request_id: Optional[str] = None,
+        database_dump_type: Optional[DatabaseDumpSpec] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.destination_gcs_folder = destination_gcs_folder
+        self.project_id = project_id
+        self.region = region
+        self.service_id = service_id
+        self.request_id = request_id
+        self.database_dump_type = database_dump_type
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Dict):
+        hook = DataprocMetastoreHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        self.log.info("Exporting metadata from Dataproc Metastore service: %s", self.service_id)
+        hook.export_metadata(
+            destination_gcs_folder=self.destination_gcs_folder,
+            project_id=self.project_id,
+            region=self.region,
+            service_id=self.service_id,
+            request_id=self.request_id,
+            database_dump_type=self.database_dump_type,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        metadata_export = self._wait_for_export_metadata(hook)
+        self.log.info("Metadata from service %s exported successfully", self.service_id)
+        return MetadataExport.to_dict(metadata_export)
+
+    def _wait_for_export_metadata(self, hook: DataprocMetastoreHook):
+        """
+        Workaround to check that export was created successfully.
+        We discovered a issue to parse result to MetadataExport inside the SDK
+        """
+        for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
+            sleep(time_to_wait)
+            service = hook.get_service(
+                region=self.region,
+                project_id=self.project_id,
+                service_id=self.service_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            activities: MetadataManagementActivity = service.metadata_management_activity
+            metadata_export: MetadataExport = activities.metadata_exports[0]
+            if metadata_export.state == MetadataExport.State.SUCCEEDED:
+                return metadata_export
+            if metadata_export.state == MetadataExport.State.FAILED:
+                raise AirflowException(
+                    f"Exporting metadata from Dataproc Metastore {metadata_export.name} FAILED"
+                )
+
+
+class DataprocMetastoreGetServiceOperator(BaseOperator):
+    """
+    Gets the details of a single service.
+
+    :param region: Required. The ID of the Google Cloud region that the service belongs to.
+    :type region: str
+    :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+    :type project_id: str
+    :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+        the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+        with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+        hyphens.
+
+        This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type service_id: str
+    :param retry: Designation of what errors, if any, should be retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: The timeout for this request.
+    :type timeout: float
+    :param metadata: Strings which should be sent along with the request as metadata.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        region: str,
+        project_id: str,
+        service_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.region = region
+        self.project_id = project_id
+        self.service_id = service_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context) -> dict:
+        hook = DataprocMetastoreHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        self.log.info("Gets the details of a single Dataproc Metastore service: %s", self.project_id)
+        result = hook.get_service(
+            region=self.region,
+            project_id=self.project_id,
+            service_id=self.service_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Service.to_dict(result)
+
+
+class DataprocMetastoreListBackupsOperator(BaseOperator):
+    """
+    Lists backups in a service.
+
+    :param project_id: Required. The ID of the Google Cloud project that the backup belongs to.
+    :type project_id: str
+    :param region: Required. The ID of the Google Cloud region that the backup belongs to.
+    :type region: str
+    :param service_id: Required. The ID of the metastore service, which is used as the final component of
+        the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+        with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+        hyphens.
+
+        This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type service_id: str
+    :param retry: Optional. Designation of what errors, if any, should be retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: Optional. The timeout for this request.
+    :type timeout: float
+    :param metadata: Optional. Strings which should be sent along with the request as metadata.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        service_id: str,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        filter: Optional[str] = None,
+        order_by: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.service_id = service_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.filter = filter
+        self.order_by = order_by
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: dict) -> dict:
+        hook = DataprocMetastoreHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        self.log.info("Listing Dataproc Metastore backups: %s", self.service_id)
+        backups = hook.list_backups(
+            project_id=self.project_id,
+            region=self.region,
+            service_id=self.service_id,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            filter=self.filter,
+            order_by=self.order_by,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Backup.to_dict(backup) for backup in backups]
+
+
+class DataprocMetastoreRestoreServiceOperator(BaseOperator):
+    """
+    Restores a service from a backup.
+
+    :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+    :type project_id: str
+    :param region: Required. The ID of the Google Cloud region that the service belongs to.
+    :type region: str
+    :param service_id: Required. The ID of the metastore service, which is used as the final component of
+        the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+        with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+        hyphens.
+
+        This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type service_id: str
+    :param backup_project_id: Required. The ID of the Google Cloud project that the metastore
+        service backup to restore from.
+    :type backup_project_id: str
+    :param backup_region: Required. The ID of the Google Cloud region that the metastore
+        service backup to restore from.
+    :type backup_region: str
+    :param backup_service_id:  Required. The ID of the metastore service backup to restore from, which is
+        used as the final component of the metastore service's name. This value must be between 2 and 63
+        characters long inclusive, begin with a letter, end with a letter or number, and consist
+        of alphanumeric ASCII characters or hyphens.
+    :type backup_service_id: str
+    :param backup_id:  Required. The ID of the metastore service backup to restore from
+    :type backup_id: str
+    :param restore_type: Optional. The type of restore. If unspecified, defaults to
+        ``METADATA_ONLY``
+    :type restore_type: google.cloud.metastore_v1.types.Restore.RestoreType
+    :param request_id: Optional. A unique id used to identify the request.
+    :type request_id: str
+    :param retry: Optional. Designation of what errors, if any, should be retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: Optional. The timeout for this request.
+    :type timeout: float
+    :param metadata: Optional. Strings which should be sent along with the request as metadata.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        service_id: str,
+        backup_project_id: str,
+        backup_region: str,
+        backup_service_id: str,
+        backup_id: str,
+        restore_type: Optional[Restore] = None,
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.service_id = service_id
+        self.backup_project_id = backup_project_id
+        self.backup_region = backup_region
+        self.backup_service_id = backup_service_id
+        self.backup_id = backup_id
+        self.restore_type = restore_type
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context) -> dict:
+        hook = DataprocMetastoreHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        self.log.info(
+            "Restoring Dataproc Metastore service: %s from backup: %s", self.service_id, self.backup_id
+        )
+        hook.restore_service(
+            project_id=self.project_id,
+            region=self.region,
+            service_id=self.service_id,
+            backup_project_id=self.backup_project_id,
+            backup_region=self.backup_region,
+            backup_service_id=self.backup_service_id,
+            backup_id=self.backup_id,
+            restore_type=self.restore_type,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        self._wait_for_restore_service(hook)
+        self.log.info("Service %s restored from backup %s", self.service_id, self.backup_id)
+
+    def _wait_for_restore_service(self, hook: DataprocMetastoreHook):
+        """
+        Workaround to check that restore service was finished successfully.
+        We discovered an issue to parse result to Restore inside the SDK
+        """
+        for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
+            sleep(time_to_wait)
+            service = hook.get_service(
+                region=self.region,
+                project_id=self.project_id,
+                service_id=self.service_id,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            activities: MetadataManagementActivity = service.metadata_management_activity
+            restore_service: Restore = activities.restores[0]
+            if restore_service.state == Restore.State.SUCCEEDED:
+                return restore_service
+            if restore_service.state == Restore.State.FAILED:
+                raise AirflowException("Restoring service FAILED")
+
+
+class DataprocMetastoreUpdateServiceOperator(BaseOperator):
+    """
+    Updates the parameters of a single service.
+
+    :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+    :type project_id: str
+    :param region: Required. The ID of the Google Cloud region that the service belongs to.
+    :type region: str
+    :param service_id:  Required. The ID of the metastore service, which is used as the final component of
+        the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
+        with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
+        hyphens.
+
+        This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
+        provided, this should not be set.
+    :type service_id: str
+    :param service:  Required. The metastore service to update. The server only merges fields in the service
+        if they are specified in ``update_mask``.
+
+        The metastore service's ``name`` field is used to identify the metastore service to be updated.
+
+        This corresponds to the ``service`` field on the ``request`` instance; if ``request`` is provided,
+        this should not be set.
+    :type service: Union[Dict, google.cloud.metastore_v1.types.Service]
+    :param update_mask:  Required. A field mask used to specify the fields to be overwritten in the metastore
+        service resource by the update. Fields specified in the ``update_mask`` are relative to the resource
+        (not to the full request). A field is overwritten if it is in the mask.
+
+        This corresponds to the ``update_mask`` field on the ``request`` instance; if ``request`` is provided,
+        this should not be set.
+    :type update_mask: google.protobuf.field_mask_pb2.FieldMask
+    :param request_id: Optional. A unique id used to identify the request.
+    :type request_id: str
+    :param retry: Optional. Designation of what errors, if any, should be retried.
+    :type retry: google.api_core.retry.Retry
+    :param timeout: Optional. The timeout for this request.
+    :type timeout: float
+    :param metadata: Optional. Strings which should be sent along with the request as metadata.
+    :type metadata: Sequence[Tuple[str, str]]
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud.
+    :type gcp_conn_id: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'project_id',
+        'impersonation_chain',
+    )
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        service_id: str,
+        service: Union[Dict, Service],
+        update_mask: Union[Dict, FieldMask],
+        request_id: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Optional[Sequence[Tuple[str, str]]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.service_id = service_id
+        self.service = service
+        self.update_mask = update_mask
+        self.request_id = request_id
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Dict):
+        hook = DataprocMetastoreHook(
+            gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
+        )
+        self.log.info("Updating Dataproc Metastore service: %s", self.service.get("name"))
+
+        operation = hook.update_service(
+            project_id=self.project_id,
+            region=self.region,
+            service_id=self.service_id,
+            service=self.service,
+            update_mask=self.update_mask,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        hook.wait_for_operation(self.timeout, operation)
+        self.log.info("Service %s updated successfully", self.service.get("name"))
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index ef842f0..ff1c896 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -230,6 +230,11 @@ integrations:
       - /docs/apache-airflow-providers-google/operators/cloud/dataprep.rst
     logo: /integration-logos/gcp/Google-Dataprep.png
     tags: [gcp]
+  - integration-name: Google Dataproc Metastore
+    external-doc-url: https://cloud.google.com/dataproc-metastore/
+    how-to-guide:
+      - /docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
+    tags: [gcp]
   - integration-name: Google Dataproc
     external-doc-url: https://cloud.google.com/dataproc/
     how-to-guide:
@@ -368,6 +373,9 @@ operators:
   - integration-name: Google Dataprep
     python-modules:
       - airflow.providers.google.cloud.operators.dataprep
+  - integration-name: Google Dataproc Metastore
+    python-modules:
+      - airflow.providers.google.cloud.operators.dataproc_metastore
   - integration-name: Google Dataproc
     python-modules:
       - airflow.providers.google.cloud.operators.dataproc
@@ -537,6 +545,9 @@ hooks:
   - integration-name: Google Dataprep
     python-modules:
       - airflow.providers.google.cloud.hooks.dataprep
+  - integration-name: Google Dataproc Metastore
+    python-modules:
+      - airflow.providers.google.cloud.hooks.dataproc_metastore
   - integration-name: Google Dataproc
     python-modules:
       - airflow.providers.google.cloud.hooks.dataproc
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst b/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
new file mode 100644
index 0000000..c7ff530
--- /dev/null
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataproc_metastore.rst
@@ -0,0 +1,196 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Google Cloud Dataproc Metastore Operators
+=========================================
+
+Dataproc Metastore is a fully managed, highly available, auto-healing serverless
+Apache Hive metastore (HMS) that runs on Google Cloud. It supports HMS, serves as
+a critical component for managing the metadata of relational entities,
+and provides interoperability between data processing applications in the open source data ecosystem.
+
+For more information about the service visit `Dataproc Metastore production documentation <Product documentation <https://cloud.google.com/dataproc-metastore/docs/reference>`__
+
+Create a Service
+----------------
+
+Before you create a dataproc metastore service you need to define the service.
+For more information about the available fields to pass when creating a service, visit `Dataproc Metastore create service API. <https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services#Service>`__
+
+A simple service configuration can look as followed:
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_metastore_create_service]
+    :end-before: [END how_to_cloud_dataproc_metastore_create_service]
+
+With this configuration we can create the service:
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateServiceOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_metastore_create_service_operator]
+    :end-before: [END how_to_cloud_dataproc_metastore_create_service_operator]
+
+Get a service
+-------------
+
+To get a service you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreGetServiceOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_metastore_get_service_operator]
+    :end-before: [END how_to_cloud_dataproc_metastore_get_service_operator]
+
+Update a service
+----------------
+You can update the service by providing a service config and an updateMask.
+In the updateMask argument you specifies the path, relative to Service, of the field to update.
+For more information on updateMask and other parameters take a look at `Dataproc Metastore update service API. <https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services/patch>`__
+
+An example of a new service config and the updateMask:
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_metastore_update_service]
+    :end-before: [END how_to_cloud_dataproc_metastore_update_service]
+
+To update a service you can use:
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreUpdateServiceOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_metastore_update_service_operator]
+    :end-before: [END how_to_cloud_dataproc_metastore_update_service_operator]
+
+Delete a service
+----------------
+
+To delete a service you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteServiceOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_metastore_delete_service_operator]
+    :end-before: [END how_to_cloud_dataproc_metastore_delete_service_operator]
+
+Export a service metadata
+-------------------------
+
+To export metadata you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreExportMetadataOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_metastore_export_metadata_operator]
+    :end-before: [END how_to_cloud_dataproc_metastore_export_metadata_operator]
+
+Restore a service
+-----------------
+
+To restore a service you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreRestoreServiceOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_metastore_restore_service_operator]
+    :end-before: [END how_to_cloud_dataproc_metastore_restore_service_operator]
+
+Create a metadata import
+------------------------
+
+Before you create a dataproc metastore metadata import you need to define the metadata import.
+For more information about the available fields to pass when creating a metadata import, visit `Dataproc Metastore create metadata import API. <https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services.metadataImports#MetadataImport>`__
+
+A simple metadata import configuration can look as followed:
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_metastore_create_metadata_import]
+    :end-before: [END how_to_cloud_dataproc_metastore_create_metadata_import]
+
+To create a metadata import you can use:
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateMetadataImportOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_metastore_create_metadata_import_operator]
+    :end-before: [END how_to_cloud_dataproc_metastore_create_metadata_import_operator]
+
+Create a Backup
+---------------
+
+Before you create a dataproc metastore backup of the service you need to define the backup.
+For more information about the available fields to pass when creating a backup, visit `Dataproc Metastore create backup API. <https://cloud.google.com/dataproc-metastore/docs/reference/rest/v1/projects.locations.services.backups#Backup>`__
+
+A simple backup configuration can look as followed:
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 0
+    :start-after: [START how_to_cloud_dataproc_metastore_create_backup]
+    :end-before: [END how_to_cloud_dataproc_metastore_create_backup]
+
+With this configuration we can create the backup:
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreCreateBackupOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_metastore_create_backup_operator]
+    :end-before: [END how_to_cloud_dataproc_metastore_create_backup_operator]
+
+Delete a backup
+---------------
+
+To delete a backup you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDeleteBackupOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_metastore_delete_backup_operator]
+    :end-before: [END how_to_cloud_dataproc_metastore_delete_backup_operator]
+
+List backups
+------------
+
+To list backups you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreListBackupsOperator`
+
+.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py
+    :language: python
+    :dedent: 4
+    :start-after: [START how_to_cloud_dataproc_metastore_list_backups_operator]
+    :end-before: [END how_to_cloud_dataproc_metastore_list_backups_operator]
diff --git a/setup.py b/setup.py
index 70b715b..7542a7d 100644
--- a/setup.py
+++ b/setup.py
@@ -307,6 +307,7 @@ google = [
     'google-cloud-container>=0.1.1,<2.0.0',
     'google-cloud-datacatalog>=3.0.0,<4.0.0',
     'google-cloud-dataproc>=2.2.0,<4.0.0',
+    'google-cloud-dataproc-metastore>=1.2.0,<2.0.0',
     'google-cloud-dlp>=0.11.0,<2.0.0',
     'google-cloud-kms>=2.0.0,<3.0.0',
     'google-cloud-language>=1.1.1,<2.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_dataproc_metastore.py b/tests/providers/google/cloud/hooks/test_dataproc_metastore.py
new file mode 100644
index 0000000..cd8602c
--- /dev/null
+++ b/tests/providers/google/cloud/hooks/test_dataproc_metastore.py
@@ -0,0 +1,489 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from unittest import TestCase, mock
+
+from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook
+from tests.providers.google.cloud.utils.base_gcp_mock import (
+    mock_base_gcp_hook_default_project_id,
+    mock_base_gcp_hook_no_default_project_id,
+)
+
+TEST_GCP_CONN_ID: str = "test-gcp-conn-id"
+TEST_REGION: str = "test-region"
+TEST_PROJECT_ID: str = "test-project-id"
+TEST_BACKUP: str = "test-backup"
+TEST_BACKUP_ID: str = "test-backup-id"
+TEST_METADATA_IMPORT: dict = {
+    "name": "test-metadata-import",
+    "database_dump": {
+        "gcs_uri": "gs://bucket_name/path_inside_bucket",
+        "database_type": "MYSQL",
+    },
+}
+TEST_METADATA_IMPORT_ID: str = "test-metadata-import-id"
+TEST_SERVICE: dict = {"name": "test-service"}
+TEST_SERVICE_ID: str = "test-service-id"
+TEST_SERVICE_TO_UPDATE = {
+    "labels": {
+        "first_key": "first_value",
+        "second_key": "second_value",
+    }
+}
+TEST_UPDATE_MASK: dict = {"paths": ["labels"]}
+TEST_PARENT: str = "projects/{}/locations/{}"
+TEST_PARENT_SERVICES: str = "projects/{}/locations/{}/services/{}"
+TEST_PARENT_BACKUPS: str = "projects/{}/locations/{}/services/{}/backups"
+TEST_NAME_BACKUPS: str = "projects/{}/locations/{}/services/{}/backups/{}"
+TEST_DESTINATION_GCS_FOLDER: str = "gs://bucket_name/path_inside_bucket"
+
+BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
+DATAPROC_METASTORE_STRING = "airflow.providers.google.cloud.hooks.dataproc_metastore.{}"
+
+
+class TestDataprocMetastoreWithDefaultProjectIdHook(TestCase):
+    def setUp(self):
+        with mock.patch(
+            BASE_STRING.format("GoogleBaseHook.__init__"), new=mock_base_gcp_hook_default_project_id
+        ):
+            self.hook = DataprocMetastoreHook(gcp_conn_id=TEST_GCP_CONN_ID)
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_create_backup(self, mock_client) -> None:
+        self.hook.create_backup(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+            backup=TEST_BACKUP,
+            backup_id=TEST_BACKUP_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.create_backup.assert_called_once_with(
+            request=dict(
+                parent=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                backup=TEST_BACKUP,
+                backup_id=TEST_BACKUP_ID,
+                request_id=None,
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_create_metadata_import(self, mock_client) -> None:
+        self.hook.create_metadata_import(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+            metadata_import=TEST_METADATA_IMPORT,
+            metadata_import_id=TEST_METADATA_IMPORT_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.create_metadata_import.assert_called_once_with(
+            request=dict(
+                parent=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                metadata_import=TEST_METADATA_IMPORT,
+                metadata_import_id=TEST_METADATA_IMPORT_ID,
+                request_id=None,
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_create_service(self, mock_client) -> None:
+        self.hook.create_service(
+            region=TEST_REGION,
+            project_id=TEST_PROJECT_ID,
+            service=TEST_SERVICE,
+            service_id=TEST_SERVICE_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.create_service.assert_called_once_with(
+            request=dict(
+                parent=TEST_PARENT.format(TEST_PROJECT_ID, TEST_REGION),
+                service_id=TEST_SERVICE_ID,
+                service=TEST_SERVICE,
+                request_id=None,
+            ),
+            metadata=(),
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_delete_backup(self, mock_client) -> None:
+        self.hook.delete_backup(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+            backup_id=TEST_BACKUP_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.delete_backup.assert_called_once_with(
+            request=dict(
+                name=TEST_NAME_BACKUPS.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID, TEST_BACKUP_ID),
+                request_id=None,
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_delete_service(self, mock_client) -> None:
+        self.hook.delete_service(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.delete_service.assert_called_once_with(
+            request=dict(
+                name=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                request_id=None,
+            ),
+            retry=None,
+            timeout=None,
+            metadata=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_export_metadata(self, mock_client) -> None:
+        self.hook.export_metadata(
+            destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.export_metadata.assert_called_once_with(
+            request=dict(
+                destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+                service=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                request_id=None,
+                database_dump_type=None,
+            ),
+            retry=None,
+            timeout=None,
+            metadata=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_get_service(self, mock_client) -> None:
+        self.hook.get_service(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.get_service.assert_called_once_with(
+            request=dict(
+                name=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_list_backups(self, mock_client) -> None:
+        self.hook.list_backups(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.list_backups.assert_called_once_with(
+            request=dict(
+                parent=TEST_PARENT_BACKUPS.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                page_size=None,
+                page_token=None,
+                filter=None,
+                order_by=None,
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_restore_service(self, mock_client) -> None:
+        self.hook.restore_service(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+            backup_project_id=TEST_PROJECT_ID,
+            backup_region=TEST_REGION,
+            backup_service_id=TEST_SERVICE_ID,
+            backup_id=TEST_BACKUP_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.restore_service.assert_called_once_with(
+            request=dict(
+                service=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                backup=TEST_NAME_BACKUPS.format(
+                    TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID, TEST_BACKUP_ID
+                ),
+                restore_type=None,
+                request_id=None,
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_update_service(self, mock_client) -> None:
+        self.hook.update_service(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+            service=TEST_SERVICE_TO_UPDATE,
+            update_mask=TEST_UPDATE_MASK,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.update_service.assert_called_once_with(
+            request=dict(
+                service=TEST_SERVICE_TO_UPDATE,
+                update_mask=TEST_UPDATE_MASK,
+                request_id=None,
+            ),
+            retry=None,
+            timeout=None,
+            metadata=None,
+        )
+
+
+class TestDataprocMetastoreWithoutDefaultProjectIdHook(TestCase):
+    def setUp(self):
+        with mock.patch(
+            BASE_STRING.format("GoogleBaseHook.__init__"), new=mock_base_gcp_hook_no_default_project_id
+        ):
+            self.hook = DataprocMetastoreHook(gcp_conn_id=TEST_GCP_CONN_ID)
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_create_backup(self, mock_client) -> None:
+        self.hook.create_backup(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+            backup=TEST_BACKUP,
+            backup_id=TEST_BACKUP_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.create_backup.assert_called_once_with(
+            request=dict(
+                parent=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                backup=TEST_BACKUP,
+                backup_id=TEST_BACKUP_ID,
+                request_id=None,
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_create_metadata_import(self, mock_client) -> None:
+        self.hook.create_metadata_import(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+            metadata_import=TEST_METADATA_IMPORT,
+            metadata_import_id=TEST_METADATA_IMPORT_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.create_metadata_import.assert_called_once_with(
+            request=dict(
+                parent=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                metadata_import=TEST_METADATA_IMPORT,
+                metadata_import_id=TEST_METADATA_IMPORT_ID,
+                request_id=None,
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_create_service(self, mock_client) -> None:
+        self.hook.create_service(
+            region=TEST_REGION,
+            project_id=TEST_PROJECT_ID,
+            service=TEST_SERVICE,
+            service_id=TEST_SERVICE_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.create_service.assert_called_once_with(
+            request=dict(
+                parent=TEST_PARENT.format(TEST_PROJECT_ID, TEST_REGION),
+                service_id=TEST_SERVICE_ID,
+                service=TEST_SERVICE,
+                request_id=None,
+            ),
+            metadata=(),
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_delete_backup(self, mock_client) -> None:
+        self.hook.delete_backup(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+            backup_id=TEST_BACKUP_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.delete_backup.assert_called_once_with(
+            request=dict(
+                name=TEST_NAME_BACKUPS.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID, TEST_BACKUP_ID),
+                request_id=None,
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_delete_service(self, mock_client) -> None:
+        self.hook.delete_service(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.delete_service.assert_called_once_with(
+            request=dict(
+                name=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                request_id=None,
+            ),
+            retry=None,
+            timeout=None,
+            metadata=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_export_metadata(self, mock_client) -> None:
+        self.hook.export_metadata(
+            destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.export_metadata.assert_called_once_with(
+            request=dict(
+                destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+                service=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                request_id=None,
+                database_dump_type=None,
+            ),
+            retry=None,
+            timeout=None,
+            metadata=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_get_service(self, mock_client) -> None:
+        self.hook.get_service(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.get_service.assert_called_once_with(
+            request=dict(
+                name=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_list_backups(self, mock_client) -> None:
+        self.hook.list_backups(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.list_backups.assert_called_once_with(
+            request=dict(
+                parent=TEST_PARENT_BACKUPS.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                page_size=None,
+                page_token=None,
+                filter=None,
+                order_by=None,
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_restore_service(self, mock_client) -> None:
+        self.hook.restore_service(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+            backup_project_id=TEST_PROJECT_ID,
+            backup_region=TEST_REGION,
+            backup_service_id=TEST_SERVICE_ID,
+            backup_id=TEST_BACKUP_ID,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.restore_service.assert_called_once_with(
+            request=dict(
+                service=TEST_PARENT_SERVICES.format(TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID),
+                backup=TEST_NAME_BACKUPS.format(
+                    TEST_PROJECT_ID, TEST_REGION, TEST_SERVICE_ID, TEST_BACKUP_ID
+                ),
+                restore_type=None,
+                request_id=None,
+            ),
+            metadata=None,
+            retry=None,
+            timeout=None,
+        )
+
+    @mock.patch(DATAPROC_METASTORE_STRING.format("DataprocMetastoreHook.get_dataproc_metastore_client"))
+    def test_update_service(self, mock_client) -> None:
+        self.hook.update_service(
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            service_id=TEST_SERVICE_ID,
+            service=TEST_SERVICE_TO_UPDATE,
+            update_mask=TEST_UPDATE_MASK,
+        )
+        mock_client.assert_called_once()
+        mock_client.return_value.update_service.assert_called_once_with(
+            request=dict(
+                service=TEST_SERVICE_TO_UPDATE,
+                update_mask=TEST_UPDATE_MASK,
+                request_id=None,
+            ),
+            retry=None,
+            timeout=None,
+            metadata=None,
+        )
diff --git a/tests/providers/google/cloud/operators/test_dataproc_metastore.py b/tests/providers/google/cloud/operators/test_dataproc_metastore.py
new file mode 100644
index 0000000..652b983
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_dataproc_metastore.py
@@ -0,0 +1,396 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase, mock
+
+from google.api_core.retry import Retry
+
+from airflow.providers.google.cloud.operators.dataproc_metastore import (
+    DataprocMetastoreCreateBackupOperator,
+    DataprocMetastoreCreateMetadataImportOperator,
+    DataprocMetastoreCreateServiceOperator,
+    DataprocMetastoreDeleteBackupOperator,
+    DataprocMetastoreDeleteServiceOperator,
+    DataprocMetastoreExportMetadataOperator,
+    DataprocMetastoreGetServiceOperator,
+    DataprocMetastoreListBackupsOperator,
+    DataprocMetastoreRestoreServiceOperator,
+    DataprocMetastoreUpdateServiceOperator,
+)
+
+TASK_ID: str = "task_id"
+GCP_LOCATION: str = "test-location"
+GCP_PROJECT_ID: str = "test-project-id"
+
+GCP_CONN_ID: str = "test-gcp-conn-id"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+TEST_SERVICE: dict = {"name": "test-service"}
+TEST_SERVICE_ID: str = "test-service-id"
+
+TEST_TIMEOUT = 120
+TEST_RETRY = mock.MagicMock(Retry)
+TEST_METADATA = [("key", "value")]
+TEST_REQUEST_ID = "request_id_uuid"
+
+TEST_BACKUP: dict = {"name": "test-backup"}
+TEST_BACKUP_ID: str = "test-backup-id"
+TEST_METADATA_IMPORT: dict = {
+    "name": "test-metadata-import",
+    "database_dump": {
+        "gcs_uri": "gs://bucket_name/path_inside_bucket",
+        "database_type": "MYSQL",
+    },
+}
+TEST_METADATA_IMPORT_ID: str = "test-metadata-import-id"
+TEST_SERVICE_TO_UPDATE = {
+    "labels": {
+        "first_key": "first_value",
+        "second_key": "second_value",
+    }
+}
+TEST_UPDATE_MASK: dict = {"paths": ["labels"]}
+TEST_DESTINATION_GCS_FOLDER: str = "gs://bucket_name/path_inside_bucket"
+
+
+class TestDataprocMetastoreCreateBackupOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.Backup")
+    def test_assert_valid_hook_call(self, mock_backup, mock_hook) -> None:
+        task = DataprocMetastoreCreateBackupOperator(
+            task_id=TASK_ID,
+            project_id=GCP_PROJECT_ID,
+            region=GCP_LOCATION,
+            backup=TEST_BACKUP,
+            backup_id=TEST_BACKUP_ID,
+            service_id=TEST_SERVICE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.wait_for_operation.return_value = None
+        mock_backup.return_value.to_dict.return_value = None
+        task.execute(context=mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+        mock_hook.return_value.create_backup.assert_called_once_with(
+            project_id=GCP_PROJECT_ID,
+            region=GCP_LOCATION,
+            backup=TEST_BACKUP,
+            backup_id=TEST_BACKUP_ID,
+            service_id=TEST_SERVICE_ID,
+            request_id=None,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestDataprocMetastoreCreateMetadataImportOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.MetadataImport")
+    def test_assert_valid_hook_call(self, mock_metadata_import, mock_hook) -> None:
+        task = DataprocMetastoreCreateMetadataImportOperator(
+            task_id=TASK_ID,
+            project_id=GCP_PROJECT_ID,
+            region=GCP_LOCATION,
+            service_id=TEST_SERVICE_ID,
+            metadata_import=TEST_METADATA_IMPORT,
+            metadata_import_id=TEST_METADATA_IMPORT_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.wait_for_operation.return_value = None
+        mock_metadata_import.return_value.to_dict.return_value = None
+        task.execute(context=mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+        mock_hook.return_value.create_metadata_import.assert_called_once_with(
+            project_id=GCP_PROJECT_ID,
+            region=GCP_LOCATION,
+            service_id=TEST_SERVICE_ID,
+            metadata_import=TEST_METADATA_IMPORT,
+            metadata_import_id=TEST_METADATA_IMPORT_ID,
+            request_id=None,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestDataprocMetastoreCreateServiceOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.Service")
+    def test_execute(self, mock_service, mock_hook) -> None:
+        task = DataprocMetastoreCreateServiceOperator(
+            task_id=TASK_ID,
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT_ID,
+            service=TEST_SERVICE,
+            service_id=TEST_SERVICE_ID,
+            request_id=TEST_REQUEST_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.wait_for_operation.return_value = None
+        mock_service.return_value.to_dict.return_value = None
+        task.execute(context=mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+        mock_hook.return_value.create_service.assert_called_once_with(
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT_ID,
+            service=TEST_SERVICE,
+            service_id=TEST_SERVICE_ID,
+            request_id=TEST_REQUEST_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestDataprocMetastoreDeleteBackupOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+    def test_assert_valid_hook_call(self, mock_hook) -> None:
+        task = DataprocMetastoreDeleteBackupOperator(
+            task_id=TASK_ID,
+            project_id=GCP_PROJECT_ID,
+            region=GCP_LOCATION,
+            retry=TEST_RETRY,
+            service_id=TEST_SERVICE_ID,
+            backup_id=TEST_BACKUP_ID,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.wait_for_operation.return_value = None
+        task.execute(context=mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+        mock_hook.return_value.delete_backup.assert_called_once_with(
+            project_id=GCP_PROJECT_ID,
+            region=GCP_LOCATION,
+            service_id=TEST_SERVICE_ID,
+            backup_id=TEST_BACKUP_ID,
+            request_id=None,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestDataprocMetastoreDeleteServiceOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+    def test_execute(self, mock_hook) -> None:
+        task = DataprocMetastoreDeleteServiceOperator(
+            task_id=TASK_ID,
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT_ID,
+            service_id=TEST_SERVICE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.wait_for_operation.return_value = None
+        task.execute(context=mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+        mock_hook.return_value.delete_service.assert_called_once_with(
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT_ID,
+            service_id=TEST_SERVICE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestDataprocMetastoreExportMetadataOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.MetadataExport")
+    @mock.patch(
+        "airflow.providers.google.cloud.operators.dataproc_metastore"
+        ".DataprocMetastoreExportMetadataOperator._wait_for_export_metadata"
+    )
+    def test_assert_valid_hook_call(self, mock_wait, mock_export_metadata, mock_hook) -> None:
+        task = DataprocMetastoreExportMetadataOperator(
+            task_id=TASK_ID,
+            service_id=TEST_SERVICE_ID,
+            destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+            project_id=GCP_PROJECT_ID,
+            region=GCP_LOCATION,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_wait.return_value = None
+        mock_export_metadata.return_value.to_dict.return_value = None
+        task.execute(context=mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+        mock_hook.return_value.export_metadata.assert_called_once_with(
+            database_dump_type=None,
+            destination_gcs_folder=TEST_DESTINATION_GCS_FOLDER,
+            project_id=GCP_PROJECT_ID,
+            region=GCP_LOCATION,
+            service_id=TEST_SERVICE_ID,
+            request_id=None,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestDataprocMetastoreGetServiceOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.Service")
+    def test_execute(self, mock_service, mock_hook) -> None:
+        task = DataprocMetastoreGetServiceOperator(
+            task_id=TASK_ID,
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT_ID,
+            service_id=TEST_SERVICE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.wait_for_operation.return_value = None
+        mock_service.return_value.to_dict.return_value = None
+        task.execute(context=mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+        mock_hook.return_value.get_service.assert_called_once_with(
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT_ID,
+            service_id=TEST_SERVICE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestDataprocMetastoreListBackupsOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.Backup")
+    def test_assert_valid_hook_call(self, mock_backup, mock_hook) -> None:
+        task = DataprocMetastoreListBackupsOperator(
+            task_id=TASK_ID,
+            project_id=GCP_PROJECT_ID,
+            region=GCP_LOCATION,
+            service_id=TEST_SERVICE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.wait_for_operation.return_value = None
+        mock_backup.return_value.to_dict.return_value = None
+        task.execute(context=mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+        mock_hook.return_value.list_backups.assert_called_once_with(
+            project_id=GCP_PROJECT_ID,
+            region=GCP_LOCATION,
+            service_id=TEST_SERVICE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            filter=None,
+            order_by=None,
+            page_size=None,
+            page_token=None,
+        )
+
+
+class TestDataprocMetastoreRestoreServiceOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+    @mock.patch(
+        "airflow.providers.google.cloud.operators.dataproc_metastore"
+        ".DataprocMetastoreRestoreServiceOperator._wait_for_restore_service"
+    )
+    def test_assert_valid_hook_call(self, mock_wait, mock_hook) -> None:
+        task = DataprocMetastoreRestoreServiceOperator(
+            task_id=TASK_ID,
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT_ID,
+            service_id=TEST_SERVICE_ID,
+            backup_id=TEST_BACKUP_ID,
+            backup_region=GCP_LOCATION,
+            backup_project_id=GCP_PROJECT_ID,
+            backup_service_id=TEST_SERVICE_ID,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        mock_wait.return_value = None
+        task.execute(context=mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+        mock_hook.return_value.restore_service.assert_called_once_with(
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT_ID,
+            service_id=TEST_SERVICE_ID,
+            backup_id=TEST_BACKUP_ID,
+            backup_region=GCP_LOCATION,
+            backup_project_id=GCP_PROJECT_ID,
+            backup_service_id=TEST_SERVICE_ID,
+            restore_type=None,
+            request_id=None,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
+
+
+class TestDataprocMetastoreUpdateServiceOperator(TestCase):
+    @mock.patch("airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreHook")
+    def test_assert_valid_hook_call(self, mock_hook) -> None:
+        task = DataprocMetastoreUpdateServiceOperator(
+            task_id=TASK_ID,
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT_ID,
+            service_id=TEST_SERVICE_ID,
+            service=TEST_SERVICE_TO_UPDATE,
+            update_mask=TEST_UPDATE_MASK,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        task.execute(context=mock.MagicMock())
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
+        mock_hook.return_value.update_service.assert_called_once_with(
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT_ID,
+            service_id=TEST_SERVICE_ID,
+            service=TEST_SERVICE_TO_UPDATE,
+            update_mask=TEST_UPDATE_MASK,
+            request_id=None,
+            retry=TEST_RETRY,
+            timeout=TEST_TIMEOUT,
+            metadata=TEST_METADATA,
+        )
diff --git a/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py b/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py
new file mode 100644
index 0000000..3c1ad88
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_dataproc_metastore_system.py
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from airflow.providers.google.cloud.example_dags.example_dataproc_metastore import BUCKET
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_DATAPROC_KEY
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
+
+
+@pytest.mark.backend("mysql", "postgres")
+@pytest.mark.credential_file(GCP_DATAPROC_KEY)
+class DataprocMetastoreExampleDagsTest(GoogleSystemTest):
+    @provide_gcp_context(GCP_DATAPROC_KEY)
+    def setUp(self):
+        super().setUp()
+        self.create_gcs_bucket(BUCKET)
+
+    @provide_gcp_context(GCP_DATAPROC_KEY)
+    def tearDown(self):
+        self.delete_gcs_bucket(BUCKET)
+        super().tearDown()
+
+    @provide_gcp_context(GCP_DATAPROC_KEY)
+    def test_run_example_dag(self):
+        self.run_dag(dag_id="example_gcp_dataproc_metastore", dag_folder=CLOUD_DAG_FOLDER)