You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/02/20 23:01:21 UTC

[airflow] branch main updated: Update google cloud dlp package and adjust hook and operators (#29234)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3dbcf99d20 Update google cloud dlp package and adjust hook and operators (#29234)
3dbcf99d20 is described below

commit 3dbcf99d20d47cde0debdd5faf9bd9b2ebde1718
Author: Ɓukasz Wyszomirski <wy...@google.com>
AuthorDate: Tue Feb 21 00:01:13 2023 +0100

    Update google cloud dlp package and adjust hook and operators (#29234)
---
 airflow/providers/google/cloud/hooks/dlp.py        | 373 ++++++++++++++-------
 airflow/providers/google/cloud/operators/dlp.py    |  63 ++--
 airflow/providers/google/provider.yaml             |   4 +-
 generated/provider_dependencies.json               |   2 +-
 tests/providers/google/cloud/hooks/test_dlp.py     | 327 ++++++++++++------
 tests/providers/google/cloud/operators/test_dlp.py |  41 ++-
 6 files changed, 534 insertions(+), 276 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dlp.py b/airflow/providers/google/cloud/hooks/dlp.py
index cf3883516e..b7a71f5dd1 100644
--- a/airflow/providers/google/cloud/hooks/dlp.py
+++ b/airflow/providers/google/cloud/hooks/dlp.py
@@ -33,7 +33,7 @@ from typing import Sequence
 
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.retry import Retry
-from google.cloud.dlp_v2 import DlpServiceClient
+from google.cloud.dlp import DlpServiceClient
 from google.cloud.dlp_v2.types import (
     ByteContentItem,
     ContentItem,
@@ -41,7 +41,6 @@ from google.cloud.dlp_v2.types import (
     DeidentifyContentResponse,
     DeidentifyTemplate,
     DlpJob,
-    FieldMask,
     InspectConfig,
     InspectContentResponse,
     InspectJobConfig,
@@ -55,6 +54,7 @@ from google.cloud.dlp_v2.types import (
     StoredInfoType,
     StoredInfoTypeConfig,
 )
+from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow.exceptions import AirflowException
 from airflow.providers.google.common.consts import CLIENT_INFO
@@ -101,7 +101,7 @@ class CloudDLPHook(GoogleBaseHook):
             delegate_to=delegate_to,
             impersonation_chain=impersonation_chain,
         )
-        self._client = None
+        self._client: DlpServiceClient | None = None
 
     def get_conn(self) -> DlpServiceClient:
         """
@@ -113,6 +113,15 @@ class CloudDLPHook(GoogleBaseHook):
             self._client = DlpServiceClient(credentials=self.get_credentials(), client_info=CLIENT_INFO)
         return self._client
 
+    def _project_deidentify_template_path(self, project_id, template_id):
+        return f"{DlpServiceClient.common_project_path(project_id)}/deidentifyTemplates/{template_id}"
+
+    def _project_stored_info_type_path(self, project_id, info_type_id):
+        return f"{DlpServiceClient.common_project_path(project_id)}/storedInfoTypes/{info_type_id}"
+
+    def _project_inspect_template_path(self, project_id, inspect_template_id):
+        return f"{DlpServiceClient.common_project_path(project_id)}/inspectTemplates/{inspect_template_id}"
+
     @GoogleBaseHook.fallback_to_default_project_id
     def cancel_dlp_job(
         self,
@@ -142,7 +151,14 @@ class CloudDLPHook(GoogleBaseHook):
             raise AirflowException("Please provide the ID of the DLP job resource to be cancelled.")
 
         name = DlpServiceClient.dlp_job_path(project_id, dlp_job_id)
-        client.cancel_dlp_job(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        client.cancel_dlp_job(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     def create_deidentify_template(
         self,
@@ -177,16 +193,18 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            parent = DlpServiceClient.organization_path(organization_id)
+            parent = DlpServiceClient.common_organization_path(organization_id)
         elif project_id:
-            parent = DlpServiceClient.project_path(project_id)
+            parent = DlpServiceClient.common_project_path(project_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
         return client.create_deidentify_template(
-            parent=parent,
-            deidentify_template=deidentify_template,
-            template_id=template_id,
+            request=dict(
+                parent=parent,
+                deidentify_template=deidentify_template,
+                template_id=template_id,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -227,12 +245,14 @@ class CloudDLPHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
-        parent = DlpServiceClient.project_path(project_id)
+        parent = DlpServiceClient.common_project_path(project_id)
         job = client.create_dlp_job(
-            parent=parent,
-            inspect_job=inspect_job,
-            risk_job=risk_job,
-            job_id=job_id,
+            request=dict(
+                parent=parent,
+                inspect_job=inspect_job,
+                risk_job=risk_job,
+                job_id=job_id,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -249,7 +269,7 @@ class CloudDLPHook(GoogleBaseHook):
         while wait_until_finished:
             job = self.get_dlp_job(dlp_job_id=job_name, project_id=project_id)
 
-            self.log.info("DLP job %s state: %s.", job.name, DlpJob.JobState.Name(job.state))
+            self.log.info("DLP job %s state: %s.", job.name, job.state)
 
             if job.state == DlpJob.JobState.DONE:
                 return job
@@ -300,16 +320,18 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            parent = DlpServiceClient.organization_path(organization_id)
+            parent = DlpServiceClient.common_organization_path(organization_id)
         elif project_id:
-            parent = DlpServiceClient.project_path(project_id)
+            parent = DlpServiceClient.common_project_path(project_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
         return client.create_inspect_template(
-            parent=parent,
-            inspect_template=inspect_template,
-            template_id=template_id,
+            request=dict(
+                parent=parent,
+                inspect_template=inspect_template,
+                template_id=template_id,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -343,11 +365,13 @@ class CloudDLPHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
-        parent = DlpServiceClient.project_path(project_id)
+        parent = DlpServiceClient.common_project_path(project_id)
         return client.create_job_trigger(
-            parent=parent,
-            job_trigger=job_trigger,
-            trigger_id=trigger_id,
+            request=dict(
+                parent=parent,
+                job_trigger=job_trigger,
+                trigger_id=trigger_id,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -386,16 +410,18 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            parent = DlpServiceClient.organization_path(organization_id)
+            parent = DlpServiceClient.common_organization_path(organization_id)
         elif project_id:
-            parent = DlpServiceClient.project_path(project_id)
+            parent = DlpServiceClient.common_project_path(project_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
         return client.create_stored_info_type(
-            parent=parent,
-            config=config,
-            stored_info_type_id=stored_info_type_id,
+            request=dict(
+                parent=parent,
+                config=config,
+                stored_info_type_id=stored_info_type_id,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -441,14 +467,16 @@ class CloudDLPHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
-        parent = DlpServiceClient.project_path(project_id)
+        parent = DlpServiceClient.common_project_path(project_id)
         return client.deidentify_content(
-            parent=parent,
-            deidentify_config=deidentify_config,
-            inspect_config=inspect_config,
-            item=item,
-            inspect_template_name=inspect_template_name,
-            deidentify_template_name=deidentify_template_name,
+            request=dict(
+                parent=parent,
+                deidentify_config=deidentify_config,
+                inspect_config=inspect_config,
+                item=item,
+                inspect_template_name=inspect_template_name,
+                deidentify_template_name=deidentify_template_name,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -482,13 +510,20 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            name = DlpServiceClient.organization_deidentify_template_path(organization_id, template_id)
+            name = DlpServiceClient.deidentify_template_path(organization_id, template_id)
         elif project_id:
-            name = DlpServiceClient.project_deidentify_template_path(project_id, template_id)
+            name = self._project_deidentify_template_path(project_id, template_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
-        client.delete_deidentify_template(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        client.delete_deidentify_template(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     @GoogleBaseHook.fallback_to_default_project_id
     def delete_dlp_job(
@@ -520,7 +555,14 @@ class CloudDLPHook(GoogleBaseHook):
             raise AirflowException("Please provide the ID of the DLP job resource to be cancelled.")
 
         name = DlpServiceClient.dlp_job_path(project_id, dlp_job_id)
-        client.delete_dlp_job(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        client.delete_dlp_job(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     def delete_inspect_template(
         self,
@@ -556,13 +598,20 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            name = DlpServiceClient.organization_inspect_template_path(organization_id, template_id)
+            name = DlpServiceClient.inspect_template_path(organization_id, template_id)
         elif project_id:
-            name = DlpServiceClient.project_inspect_template_path(project_id, template_id)
+            name = self._project_inspect_template_path(project_id, template_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
-        client.delete_inspect_template(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        client.delete_inspect_template(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     @GoogleBaseHook.fallback_to_default_project_id
     def delete_job_trigger(
@@ -592,8 +641,15 @@ class CloudDLPHook(GoogleBaseHook):
         if not job_trigger_id:
             raise AirflowException("Please provide the ID of the DLP job trigger to be deleted.")
 
-        name = DlpServiceClient.project_job_trigger_path(project_id, job_trigger_id)
-        client.delete_job_trigger(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        name = DlpServiceClient.job_trigger_path(project_id, job_trigger_id)
+        client.delete_job_trigger(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     def delete_stored_info_type(
         self,
@@ -629,13 +685,20 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            name = DlpServiceClient.organization_stored_info_type_path(organization_id, stored_info_type_id)
+            name = DlpServiceClient.stored_info_type_path(organization_id, stored_info_type_id)
         elif project_id:
-            name = DlpServiceClient.project_stored_info_type_path(project_id, stored_info_type_id)
+            name = self._project_stored_info_type_path(project_id, stored_info_type_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
-        client.delete_stored_info_type(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        client.delete_stored_info_type(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     def get_deidentify_template(
         self,
@@ -671,13 +734,20 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            name = DlpServiceClient.organization_deidentify_template_path(organization_id, template_id)
+            name = DlpServiceClient.deidentify_template_path(organization_id, template_id)
         elif project_id:
-            name = DlpServiceClient.project_deidentify_template_path(project_id, template_id)
+            name = self._project_deidentify_template_path(project_id, template_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
-        return client.get_deidentify_template(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        return client.get_deidentify_template(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     @GoogleBaseHook.fallback_to_default_project_id
     def get_dlp_job(
@@ -708,7 +778,14 @@ class CloudDLPHook(GoogleBaseHook):
             raise AirflowException("Please provide the ID of the DLP job resource to be read.")
 
         name = DlpServiceClient.dlp_job_path(project_id, dlp_job_id)
-        return client.get_dlp_job(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        return client.get_dlp_job(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     def get_inspect_template(
         self,
@@ -744,13 +821,20 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            name = DlpServiceClient.organization_inspect_template_path(organization_id, template_id)
+            name = DlpServiceClient.inspect_template_path(organization_id, template_id)
         elif project_id:
-            name = DlpServiceClient.project_inspect_template_path(project_id, template_id)
+            name = self._project_inspect_template_path(project_id, template_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
-        return client.get_inspect_template(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        return client.get_inspect_template(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     @GoogleBaseHook.fallback_to_default_project_id
     def get_job_trigger(
@@ -780,8 +864,15 @@ class CloudDLPHook(GoogleBaseHook):
         if not job_trigger_id:
             raise AirflowException("Please provide the ID of the DLP job trigger to be read.")
 
-        name = DlpServiceClient.project_job_trigger_path(project_id, job_trigger_id)
-        return client.get_job_trigger(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        name = DlpServiceClient.job_trigger_path(project_id, job_trigger_id)
+        return client.get_job_trigger(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     def get_stored_info_type(
         self,
@@ -817,13 +908,20 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            name = DlpServiceClient.organization_stored_info_type_path(organization_id, stored_info_type_id)
+            name = DlpServiceClient.stored_info_type_path(organization_id, stored_info_type_id)
         elif project_id:
-            name = DlpServiceClient.project_stored_info_type_path(project_id, stored_info_type_id)
+            name = self._project_stored_info_type_path(project_id, stored_info_type_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
-        return client.get_stored_info_type(name=name, retry=retry, timeout=timeout, metadata=metadata)
+        return client.get_stored_info_type(
+            request=dict(
+                name=name,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
 
     @GoogleBaseHook.fallback_to_default_project_id
     def inspect_content(
@@ -857,12 +955,14 @@ class CloudDLPHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
-        parent = DlpServiceClient.project_path(project_id)
+        parent = DlpServiceClient.common_project_path(project_id)
         return client.inspect_content(
-            parent=parent,
-            inspect_config=inspect_config,
-            item=item,
-            inspect_template_name=inspect_template_name,
+            request=dict(
+                parent=parent,
+                inspect_config=inspect_config,
+                item=item,
+                inspect_template_name=inspect_template_name,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -903,16 +1003,18 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            parent = DlpServiceClient.organization_path(organization_id)
+            parent = DlpServiceClient.common_organization_path(organization_id)
         elif project_id:
-            parent = DlpServiceClient.project_path(project_id)
+            parent = DlpServiceClient.common_project_path(project_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
         results = client.list_deidentify_templates(
-            parent=parent,
-            page_size=page_size,
-            order_by=order_by,
+            request=dict(
+                parent=parent,
+                page_size=page_size,
+                order_by=order_by,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -953,13 +1055,15 @@ class CloudDLPHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
-        parent = DlpServiceClient.project_path(project_id)
+        parent = DlpServiceClient.common_project_path(project_id)
         results = client.list_dlp_jobs(
-            parent=parent,
-            filter_=results_filter,
-            page_size=page_size,
-            type_=job_type,
-            order_by=order_by,
+            request=dict(
+                parent=parent,
+                filter=results_filter,
+                page_size=page_size,
+                type_=job_type,
+                order_by=order_by,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -991,8 +1095,10 @@ class CloudDLPHook(GoogleBaseHook):
         client = self.get_conn()
 
         return client.list_info_types(
-            language_code=language_code,
-            filter_=results_filter,
+            request=dict(
+                language_code=language_code,
+                filter=results_filter,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -1033,16 +1139,18 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            parent = DlpServiceClient.organization_path(organization_id)
+            parent = DlpServiceClient.common_organization_path(organization_id)
         elif project_id:
-            parent = DlpServiceClient.project_path(project_id)
+            parent = DlpServiceClient.common_project_path(project_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
         results = client.list_inspect_templates(
-            parent=parent,
-            page_size=page_size,
-            order_by=order_by,
+            request=dict(
+                parent=parent,
+                page_size=page_size,
+                order_by=order_by,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -1080,12 +1188,14 @@ class CloudDLPHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
-        parent = DlpServiceClient.project_path(project_id)
+        parent = DlpServiceClient.common_project_path(project_id)
         results = client.list_job_triggers(
-            parent=parent,
-            page_size=page_size,
-            order_by=order_by,
-            filter_=results_filter,
+            request=dict(
+                parent=parent,
+                page_size=page_size,
+                order_by=order_by,
+                filter=results_filter,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -1127,16 +1237,18 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            parent = DlpServiceClient.organization_path(organization_id)
+            parent = DlpServiceClient.common_organization_path(organization_id)
         elif project_id:
-            parent = DlpServiceClient.project_path(project_id)
+            parent = DlpServiceClient.common_project_path(project_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
         results = client.list_stored_info_types(
-            parent=parent,
-            page_size=page_size,
-            order_by=order_by,
+            request=dict(
+                parent=parent,
+                page_size=page_size,
+                order_by=order_by,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -1179,13 +1291,15 @@ class CloudDLPHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
-        parent = DlpServiceClient.project_path(project_id)
+        parent = DlpServiceClient.common_project_path(project_id)
         return client.redact_image(
-            parent=parent,
-            inspect_config=inspect_config,
-            image_redaction_configs=image_redaction_configs,
-            include_findings=include_findings,
-            byte_item=byte_item,
+            request=dict(
+                parent=parent,
+                inspect_config=inspect_config,
+                image_redaction_configs=image_redaction_configs,
+                include_findings=include_findings,
+                byte_item=byte_item,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -1228,14 +1342,16 @@ class CloudDLPHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
-        parent = DlpServiceClient.project_path(project_id)
+        parent = DlpServiceClient.common_project_path(project_id)
         return client.reidentify_content(
-            parent=parent,
-            reidentify_config=reidentify_config,
-            inspect_config=inspect_config,
-            item=item,
-            inspect_template_name=inspect_template_name,
-            reidentify_template_name=reidentify_template_name,
+            request=dict(
+                parent=parent,
+                reidentify_config=reidentify_config,
+                inspect_config=inspect_config,
+                item=item,
+                inspect_template_name=inspect_template_name,
+                reidentify_template_name=reidentify_template_name,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -1279,16 +1395,18 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            name = DlpServiceClient.organization_deidentify_template_path(organization_id, template_id)
+            name = DlpServiceClient.deidentify_template_path(organization_id, template_id)
         elif project_id:
-            name = DlpServiceClient.project_deidentify_template_path(project_id, template_id)
+            name = self._project_deidentify_template_path(project_id, template_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
         return client.update_deidentify_template(
-            name=name,
-            deidentify_template=deidentify_template,
-            update_mask=update_mask,
+            request=dict(
+                name=name,
+                deidentify_template=deidentify_template,
+                update_mask=update_mask,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -1331,16 +1449,18 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            name = DlpServiceClient.organization_inspect_template_path(organization_id, template_id)
+            name = DlpServiceClient.inspect_template_path(organization_id, template_id)
         elif project_id:
-            name = DlpServiceClient.project_inspect_template_path(project_id, template_id)
+            name = self._project_inspect_template_path(project_id, template_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
         return client.update_inspect_template(
-            name=name,
-            inspect_template=inspect_template,
-            update_mask=update_mask,
+            request=dict(
+                name=name,
+                inspect_template=inspect_template,
+                update_mask=update_mask,
+            ),
             retry=retry,
             timeout=timeout,
             metadata=metadata,
@@ -1375,10 +1495,16 @@ class CloudDLPHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
+        if isinstance(job_trigger, dict):
+            job_trigger = JobTrigger(**job_trigger)
+
+        if isinstance(update_mask, dict):
+            update_mask = FieldMask(**update_mask)
+
         if not job_trigger_id:
             raise AirflowException("Please provide the ID of the DLP job trigger to be updated.")
 
-        name = DlpServiceClient.project_job_trigger_path(project_id, job_trigger_id)
+        name = DlpServiceClient.job_trigger_path(project_id, job_trigger_id)
         return client.update_job_trigger(
             name=name,
             job_trigger=job_trigger,
@@ -1427,12 +1553,19 @@ class CloudDLPHook(GoogleBaseHook):
         project_id = project_id or self.project_id
 
         if organization_id:
-            name = DlpServiceClient.organization_stored_info_type_path(organization_id, stored_info_type_id)
+            name = DlpServiceClient.stored_info_type_path(organization_id, stored_info_type_id)
         elif project_id:
-            name = DlpServiceClient.project_stored_info_type_path(project_id, stored_info_type_id)
+            name = self._project_stored_info_type_path(project_id, stored_info_type_id)
         else:
             raise AirflowException("Please provide either organization_id or project_id.")
 
         return client.update_stored_info_type(
-            name=name, config=config, update_mask=update_mask, retry=retry, timeout=timeout, metadata=metadata
+            request=dict(
+                name=name,
+                config=config,
+                update_mask=update_mask,
+            ),
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
         )
diff --git a/airflow/providers/google/cloud/operators/dlp.py b/airflow/providers/google/cloud/operators/dlp.py
index 09ffd21591..2903a3dd69 100644
--- a/airflow/providers/google/cloud/operators/dlp.py
+++ b/airflow/providers/google/cloud/operators/dlp.py
@@ -31,17 +31,23 @@ from google.cloud.dlp_v2.types import (
     ByteContentItem,
     ContentItem,
     DeidentifyConfig,
+    DeidentifyContentResponse,
     DeidentifyTemplate,
-    FieldMask,
+    DlpJob,
     InspectConfig,
+    InspectContentResponse,
     InspectJobConfig,
     InspectTemplate,
     JobTrigger,
+    ListInfoTypesResponse,
     RedactImageRequest,
+    RedactImageResponse,
+    ReidentifyContentResponse,
     RiskAnalysisJobConfig,
+    StoredInfoType,
     StoredInfoTypeConfig,
 )
-from google.protobuf.json_format import MessageToDict
+from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.dlp import CloudDLPHook
@@ -239,7 +245,7 @@ class CloudDLPCreateDeidentifyTemplateOperator(BaseOperator):
                 timeout=self.timeout,
                 metadata=self.metadata,
             )
-        result = MessageToDict(template)
+        result = DeidentifyTemplate.to_dict(template)
 
         project_id = self.project_id or hook.project_id
         template_id = self.template_id or result["name"].split("/")[-1] if result["name"] else None
@@ -352,7 +358,7 @@ class CloudDLPCreateDLPJobOperator(BaseOperator):
                 metadata=self.metadata,
             )
 
-        result = MessageToDict(job)
+        result = DlpJob.to_dict(job)
 
         project_id = self.project_id or hook.project_id
         if project_id:
@@ -462,7 +468,7 @@ class CloudDLPCreateInspectTemplateOperator(BaseOperator):
                 metadata=self.metadata,
             )
 
-        result = MessageToDict(template)
+        result = InspectTemplate.to_dict(template)
 
         template_id = self.template_id or result["name"].split("/")[-1] if result["name"] else None
         project_id = self.project_id or hook.project_id
@@ -568,7 +574,7 @@ class CloudDLPCreateJobTriggerOperator(BaseOperator):
                 metadata=self.metadata,
             )
 
-        result = MessageToDict(trigger)
+        result = JobTrigger.to_dict(trigger)
 
         project_id = self.project_id or hook.project_id
         trigger_name = result["name"].split("/")[-1] if result["name"] else None
@@ -680,7 +686,7 @@ class CloudDLPCreateStoredInfoTypeOperator(BaseOperator):
                 metadata=self.metadata,
             )
 
-        result = MessageToDict(info)
+        result = StoredInfoType.to_dict(info)
 
         project_id = self.project_id or hook.project_id
         stored_info_type_id = (
@@ -794,7 +800,7 @@ class CloudDLPDeidentifyContentOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
-        return MessageToDict(response)
+        return DeidentifyContentResponse.to_dict(response)
 
 
 class CloudDLPDeleteDeidentifyTemplateOperator(BaseOperator):
@@ -1317,7 +1323,7 @@ class CloudDLPGetDeidentifyTemplateOperator(BaseOperator):
                 context=context, task_instance=self, project_id=project_id, template_name=self.template_id
             )
 
-        return MessageToDict(template)
+        return DeidentifyTemplate.to_dict(template)
 
 
 class CloudDLPGetDLPJobOperator(BaseOperator):
@@ -1401,7 +1407,7 @@ class CloudDLPGetDLPJobOperator(BaseOperator):
                 job_name=self.dlp_job_id,
             )
 
-        return MessageToDict(job)
+        return DlpJob.to_dict(job)
 
 
 class CloudDLPGetInspectTemplateOperator(BaseOperator):
@@ -1491,7 +1497,7 @@ class CloudDLPGetInspectTemplateOperator(BaseOperator):
                 template_name=self.template_id,
             )
 
-        return MessageToDict(template)
+        return InspectTemplate.to_dict(template)
 
 
 class CloudDLPGetDLPJobTriggerOperator(BaseOperator):
@@ -1575,7 +1581,7 @@ class CloudDLPGetDLPJobTriggerOperator(BaseOperator):
                 trigger_name=self.job_trigger_id,
             )
 
-        return MessageToDict(trigger)
+        return JobTrigger.to_dict(trigger)
 
 
 class CloudDLPGetStoredInfoTypeOperator(BaseOperator):
@@ -1665,7 +1671,7 @@ class CloudDLPGetStoredInfoTypeOperator(BaseOperator):
                 info_type_name=self.stored_info_type_id,
             )
 
-        return MessageToDict(info)
+        return StoredInfoType.to_dict(info)
 
 
 class CloudDLPInspectContentOperator(BaseOperator):
@@ -1751,7 +1757,7 @@ class CloudDLPInspectContentOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
-        return MessageToDict(response)
+        return InspectContentResponse.to_dict(response)
 
 
 class CloudDLPListDeidentifyTemplatesOperator(BaseOperator):
@@ -1836,7 +1842,6 @@ class CloudDLPListDeidentifyTemplatesOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
-        # the MessageToDict does not have the right type defined as possible to pass in constructor
 
         project_id = self.project_id or hook.project_id
         if project_id:
@@ -1846,7 +1851,7 @@ class CloudDLPListDeidentifyTemplatesOperator(BaseOperator):
                 project_id=project_id,
             )
 
-        return [MessageToDict(template) for template in templates]  # type: ignore[arg-type]
+        return [DeidentifyTemplate.to_dict(template) for template in templates]  # type: ignore[arg-type]
 
 
 class CloudDLPListDLPJobsOperator(BaseOperator):
@@ -1942,8 +1947,8 @@ class CloudDLPListDLPJobsOperator(BaseOperator):
                 project_id=project_id,
             )
 
-        # the MessageToDict does not have the right type defined as possible to pass in constructor
-        return [MessageToDict(job) for job in jobs]  # type: ignore[arg-type]
+        # the DlpJob.to_dict does not have the right type defined as possible to pass in constructor
+        return [DlpJob.to_dict(job) for job in jobs]  # type: ignore[arg-type]
 
 
 class CloudDLPListInfoTypesOperator(BaseOperator):
@@ -2027,7 +2032,7 @@ class CloudDLPListInfoTypesOperator(BaseOperator):
                 project_id=project_id,
             )
 
-        return MessageToDict(response)
+        return ListInfoTypesResponse.to_dict(response)
 
 
 class CloudDLPListInspectTemplatesOperator(BaseOperator):
@@ -2121,7 +2126,7 @@ class CloudDLPListInspectTemplatesOperator(BaseOperator):
                 project_id=project_id,
             )
 
-        return [MessageToDict(t) for t in templates]
+        return [InspectTemplate.to_dict(t) for t in templates]
 
 
 class CloudDLPListJobTriggersOperator(BaseOperator):
@@ -2213,7 +2218,7 @@ class CloudDLPListJobTriggersOperator(BaseOperator):
                 project_id=project_id,
             )
 
-        return [MessageToDict(j) for j in jobs]
+        return [JobTrigger.to_dict(j) for j in jobs]
 
 
 class CloudDLPListStoredInfoTypesOperator(BaseOperator):
@@ -2307,7 +2312,7 @@ class CloudDLPListStoredInfoTypesOperator(BaseOperator):
                 project_id=project_id,
             )
 
-        return [MessageToDict(i) for i in infos]
+        return [StoredInfoType.to_dict(i) for i in infos]
 
 
 class CloudDLPRedactImageOperator(BaseOperator):
@@ -2399,7 +2404,7 @@ class CloudDLPRedactImageOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
-        return MessageToDict(response)
+        return RedactImageResponse.to_dict(response)
 
 
 class CloudDLPReidentifyContentOperator(BaseOperator):
@@ -2496,7 +2501,7 @@ class CloudDLPReidentifyContentOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
-        return MessageToDict(response)
+        return ReidentifyContentResponse.to_dict(response)
 
 
 class CloudDLPUpdateDeidentifyTemplateOperator(BaseOperator):
@@ -2596,7 +2601,7 @@ class CloudDLPUpdateDeidentifyTemplateOperator(BaseOperator):
                 template_name=self.template_id,
             )
 
-        return MessageToDict(template)
+        return DeidentifyTemplate.to_dict(template)
 
 
 class CloudDLPUpdateInspectTemplateOperator(BaseOperator):
@@ -2696,7 +2701,7 @@ class CloudDLPUpdateInspectTemplateOperator(BaseOperator):
                 template_name=self.template_id,
             )
 
-        return MessageToDict(template)
+        return InspectTemplate.to_dict(template)
 
 
 class CloudDLPUpdateJobTriggerOperator(BaseOperator):
@@ -2746,7 +2751,7 @@ class CloudDLPUpdateJobTriggerOperator(BaseOperator):
         *,
         job_trigger_id,
         project_id: str | None = None,
-        job_trigger: JobTrigger | None = None,
+        job_trigger: dict | JobTrigger | None = None,
         update_mask: dict | FieldMask | None = None,
         retry: Retry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
@@ -2790,7 +2795,7 @@ class CloudDLPUpdateJobTriggerOperator(BaseOperator):
                 trigger_name=self.job_trigger_id,
             )
 
-        return MessageToDict(trigger)
+        return JobTrigger.to_dict(trigger)
 
 
 class CloudDLPUpdateStoredInfoTypeOperator(BaseOperator):
@@ -2891,4 +2896,4 @@ class CloudDLPUpdateStoredInfoTypeOperator(BaseOperator):
                 info_type_name=self.stored_info_type_id,
             )
 
-        return MessageToDict(info)
+        return StoredInfoType.to_dict(info)
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 70861bbeb3..3326a9cb8d 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -95,7 +95,9 @@ dependencies:
   - google-cloud-dataplex>=0.1.0
   - google-cloud-dataproc>=3.1.0
   - google-cloud-dataproc-metastore>=1.2.0,<2.0.0
-  - google-cloud-dlp>=0.11.0,<2.0.0
+  # google-cloud-dlp of version 3.8.0+ requires higher versions of
+  # protobuf and proto-plus libraries which can break other dependencies in the current package.
+  - google-cloud-dlp>=3.0.0,<3.8.0
   - google-cloud-kms>=2.0.0
   - google-cloud-language>=1.1.1,<2.0.0
   - google-cloud-logging>=2.1.1
diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json
index 4c3c62fe9a..6a914f9ca5 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -346,7 +346,7 @@
       "google-cloud-dataplex>=0.1.0",
       "google-cloud-dataproc-metastore>=1.2.0,<2.0.0",
       "google-cloud-dataproc>=3.1.0",
-      "google-cloud-dlp>=0.11.0,<2.0.0",
+      "google-cloud-dlp>=3.0.0,<3.8.0",
       "google-cloud-kms>=2.0.0",
       "google-cloud-language>=1.1.1,<2.0.0",
       "google-cloud-logging>=2.1.1",
diff --git a/tests/providers/google/cloud/hooks/test_dlp.py b/tests/providers/google/cloud/hooks/test_dlp.py
index a75a5f8009..9c9faeba65 100644
--- a/tests/providers/google/cloud/hooks/test_dlp.py
+++ b/tests/providers/google/cloud/hooks/test_dlp.py
@@ -74,7 +74,7 @@ class TestCloudDLPHook:
         self.hook.cancel_dlp_job(dlp_job_id=DLP_JOB_ID, project_id=PROJECT_ID)
 
         get_conn.return_value.cancel_dlp_job.assert_called_once_with(
-            name=DLP_JOB_PATH, retry=DEFAULT, timeout=None, metadata=()
+            request=dict(name=DLP_JOB_PATH), retry=DEFAULT, timeout=None, metadata=()
         )
 
     @mock.patch("airflow.providers.google.cloud.hooks.dlp.CloudDLPHook.get_conn")
@@ -104,9 +104,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.create_deidentify_template.assert_called_once_with(
-            parent=ORGANIZATION_PATH,
-            deidentify_template=None,
-            template_id=None,
+            request=dict(
+                parent=ORGANIZATION_PATH,
+                deidentify_template=None,
+                template_id=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -119,9 +121,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.create_deidentify_template.assert_called_once_with(
-            parent=PROJECT_PATH,
-            deidentify_template=None,
-            template_id=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                deidentify_template=None,
+                template_id=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -144,10 +148,12 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.create_dlp_job.assert_called_once_with(
-            parent=PROJECT_PATH,
-            inspect_job=None,
-            risk_job=None,
-            job_id=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                inspect_job=None,
+                risk_job=None,
+                job_id=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -173,7 +179,12 @@ class TestCloudDLPHook:
         self.hook.create_dlp_job(project_id=PROJECT_ID)
 
         get_conn.return_value.get_dlp_job.assert_called_once_with(
-            name=DLP_JOB_PATH, retry=DEFAULT, timeout=None, metadata=()
+            request=dict(
+                name=DLP_JOB_PATH,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
         )
 
     @mock.patch(
@@ -188,9 +199,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.create_inspect_template.assert_called_once_with(
-            parent=ORGANIZATION_PATH,
-            inspect_template=None,
-            template_id=None,
+            request=dict(
+                parent=ORGANIZATION_PATH,
+                inspect_template=None,
+                template_id=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -203,9 +216,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.create_inspect_template.assert_called_once_with(
-            parent=PROJECT_PATH,
-            inspect_template=None,
-            template_id=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                inspect_template=None,
+                template_id=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -228,9 +243,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.create_job_trigger.assert_called_once_with(
-            parent=PROJECT_PATH,
-            job_trigger=None,
-            trigger_id=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                job_trigger=None,
+                trigger_id=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -258,9 +275,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.create_stored_info_type.assert_called_once_with(
-            parent=ORGANIZATION_PATH,
-            config=None,
-            stored_info_type_id=None,
+            request=dict(
+                parent=ORGANIZATION_PATH,
+                config=None,
+                stored_info_type_id=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -273,9 +292,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.create_stored_info_type.assert_called_once_with(
-            parent=PROJECT_PATH,
-            config=None,
-            stored_info_type_id=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                config=None,
+                stored_info_type_id=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -298,12 +319,14 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.deidentify_content.assert_called_once_with(
-            parent=PROJECT_PATH,
-            deidentify_config=None,
-            inspect_config=None,
-            item=None,
-            inspect_template_name=None,
-            deidentify_template_name=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                deidentify_config=None,
+                inspect_config=None,
+                item=None,
+                inspect_template_name=None,
+                deidentify_template_name=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -329,7 +352,9 @@ class TestCloudDLPHook:
         self.hook.delete_deidentify_template(template_id=TEMPLATE_ID, organization_id=ORGANIZATION_ID)
 
         get_conn.return_value.delete_deidentify_template.assert_called_once_with(
-            name=DEIDENTIFY_TEMPLATE_ORGANIZATION_PATH,
+            request=dict(
+                name=DEIDENTIFY_TEMPLATE_ORGANIZATION_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -340,7 +365,9 @@ class TestCloudDLPHook:
         self.hook.delete_deidentify_template(template_id=TEMPLATE_ID, project_id=PROJECT_ID)
 
         get_conn.return_value.delete_deidentify_template.assert_called_once_with(
-            name=DEIDENTIFY_TEMPLATE_PROJECT_PATH,
+            request=dict(
+                name=DEIDENTIFY_TEMPLATE_PROJECT_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -366,7 +393,12 @@ class TestCloudDLPHook:
         self.hook.delete_dlp_job(dlp_job_id=DLP_JOB_ID, project_id=PROJECT_ID)
 
         get_conn.return_value.delete_dlp_job.assert_called_once_with(
-            name=DLP_JOB_PATH, retry=DEFAULT, timeout=None, metadata=()
+            request=dict(
+                name=DLP_JOB_PATH,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
         )
 
     @mock.patch("airflow.providers.google.cloud.hooks.dlp.CloudDLPHook.get_conn")
@@ -394,7 +426,9 @@ class TestCloudDLPHook:
         self.hook.delete_inspect_template(template_id=TEMPLATE_ID, organization_id=ORGANIZATION_ID)
 
         get_conn.return_value.delete_inspect_template.assert_called_once_with(
-            name=INSPECT_TEMPLATE_ORGANIZATION_PATH,
+            request=dict(
+                name=INSPECT_TEMPLATE_ORGANIZATION_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -405,7 +439,9 @@ class TestCloudDLPHook:
         self.hook.delete_inspect_template(template_id=TEMPLATE_ID, project_id=PROJECT_ID)
 
         get_conn.return_value.delete_inspect_template.assert_called_once_with(
-            name=INSPECT_TEMPLATE_PROJECT_PATH,
+            request=dict(
+                name=INSPECT_TEMPLATE_PROJECT_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -431,7 +467,12 @@ class TestCloudDLPHook:
         self.hook.delete_job_trigger(job_trigger_id=TRIGGER_ID, project_id=PROJECT_ID)
 
         get_conn.return_value.delete_job_trigger.assert_called_once_with(
-            name=JOB_TRIGGER_PATH, retry=DEFAULT, timeout=None, metadata=()
+            request=dict(
+                name=JOB_TRIGGER_PATH,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
         )
 
     @mock.patch("airflow.providers.google.cloud.hooks.dlp.CloudDLPHook.get_conn")
@@ -461,7 +502,9 @@ class TestCloudDLPHook:
         )
 
         get_conn.return_value.delete_stored_info_type.assert_called_once_with(
-            name=STORED_INFO_TYPE_ORGANIZATION_PATH,
+            request=dict(
+                name=STORED_INFO_TYPE_ORGANIZATION_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -472,7 +515,9 @@ class TestCloudDLPHook:
         self.hook.delete_stored_info_type(stored_info_type_id=STORED_INFO_TYPE_ID, project_id=PROJECT_ID)
 
         get_conn.return_value.delete_stored_info_type.assert_called_once_with(
-            name=STORED_INFO_TYPE_PROJECT_PATH,
+            request=dict(
+                name=STORED_INFO_TYPE_PROJECT_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -505,7 +550,9 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.get_deidentify_template.assert_called_once_with(
-            name=DEIDENTIFY_TEMPLATE_ORGANIZATION_PATH,
+            request=dict(
+                name=DEIDENTIFY_TEMPLATE_ORGANIZATION_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -518,7 +565,9 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.get_deidentify_template.assert_called_once_with(
-            name=DEIDENTIFY_TEMPLATE_PROJECT_PATH,
+            request=dict(
+                name=DEIDENTIFY_TEMPLATE_PROJECT_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -546,7 +595,12 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.get_dlp_job.assert_called_once_with(
-            name=DLP_JOB_PATH, retry=DEFAULT, timeout=None, metadata=()
+            request=dict(
+                name=DLP_JOB_PATH,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
         )
 
     @mock.patch("airflow.providers.google.cloud.hooks.dlp.CloudDLPHook.get_conn")
@@ -576,7 +630,9 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.get_inspect_template.assert_called_once_with(
-            name=INSPECT_TEMPLATE_ORGANIZATION_PATH,
+            request=dict(
+                name=INSPECT_TEMPLATE_ORGANIZATION_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -589,7 +645,9 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.get_inspect_template.assert_called_once_with(
-            name=INSPECT_TEMPLATE_PROJECT_PATH,
+            request=dict(
+                name=INSPECT_TEMPLATE_PROJECT_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -617,7 +675,12 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.get_job_trigger.assert_called_once_with(
-            name=JOB_TRIGGER_PATH, retry=DEFAULT, timeout=None, metadata=()
+            request=dict(
+                name=JOB_TRIGGER_PATH,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
         )
 
     @mock.patch("airflow.providers.google.cloud.hooks.dlp.CloudDLPHook.get_conn")
@@ -649,7 +712,9 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.get_stored_info_type.assert_called_once_with(
-            name=STORED_INFO_TYPE_ORGANIZATION_PATH,
+            request=dict(
+                name=STORED_INFO_TYPE_ORGANIZATION_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -664,7 +729,9 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.get_stored_info_type.assert_called_once_with(
-            name=STORED_INFO_TYPE_PROJECT_PATH,
+            request=dict(
+                name=STORED_INFO_TYPE_PROJECT_PATH,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -692,10 +759,12 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.inspect_content.assert_called_once_with(
-            parent=PROJECT_PATH,
-            inspect_config=None,
-            item=None,
-            inspect_template_name=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                inspect_config=None,
+                item=None,
+                inspect_template_name=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -722,9 +791,11 @@ class TestCloudDLPHook:
 
         assert isinstance(result, list)
         get_conn.return_value.list_deidentify_templates.assert_called_once_with(
-            parent=ORGANIZATION_PATH,
-            page_size=None,
-            order_by=None,
+            request=dict(
+                parent=ORGANIZATION_PATH,
+                page_size=None,
+                order_by=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -736,9 +807,11 @@ class TestCloudDLPHook:
 
         assert isinstance(result, list)
         get_conn.return_value.list_deidentify_templates.assert_called_once_with(
-            parent=PROJECT_PATH,
-            page_size=None,
-            order_by=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                page_size=None,
+                order_by=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -760,11 +833,13 @@ class TestCloudDLPHook:
 
         assert isinstance(result, list)
         get_conn.return_value.list_dlp_jobs.assert_called_once_with(
-            parent=PROJECT_PATH,
-            filter_=None,
-            page_size=None,
-            type_=None,
-            order_by=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                filter=None,
+                page_size=None,
+                type_=None,
+                order_by=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -787,7 +862,13 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.list_info_types.assert_called_once_with(
-            language_code=None, filter_=None, retry=DEFAULT, timeout=None, metadata=()
+            request=dict(
+                language_code=None,
+                filter=None,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
         )
 
     @mock.patch(
@@ -801,9 +882,11 @@ class TestCloudDLPHook:
 
         assert isinstance(result, list)
         get_conn.return_value.list_inspect_templates.assert_called_once_with(
-            parent=ORGANIZATION_PATH,
-            page_size=None,
-            order_by=None,
+            request=dict(
+                parent=ORGANIZATION_PATH,
+                page_size=None,
+                order_by=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -815,9 +898,11 @@ class TestCloudDLPHook:
 
         assert isinstance(result, list)
         get_conn.return_value.list_inspect_templates.assert_called_once_with(
-            parent=PROJECT_PATH,
-            page_size=None,
-            order_by=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                page_size=None,
+                order_by=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -839,10 +924,12 @@ class TestCloudDLPHook:
 
         assert isinstance(result, list)
         get_conn.return_value.list_job_triggers.assert_called_once_with(
-            parent=PROJECT_PATH,
-            page_size=None,
-            order_by=None,
-            filter_=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                page_size=None,
+                order_by=None,
+                filter=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -869,9 +956,11 @@ class TestCloudDLPHook:
 
         assert isinstance(result, list)
         get_conn.return_value.list_stored_info_types.assert_called_once_with(
-            parent=ORGANIZATION_PATH,
-            page_size=None,
-            order_by=None,
+            request=dict(
+                parent=ORGANIZATION_PATH,
+                page_size=None,
+                order_by=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -883,9 +972,11 @@ class TestCloudDLPHook:
 
         assert isinstance(result, list)
         get_conn.return_value.list_stored_info_types.assert_called_once_with(
-            parent=PROJECT_PATH,
-            page_size=None,
-            order_by=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                page_size=None,
+                order_by=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -908,11 +999,13 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.redact_image.assert_called_once_with(
-            parent=PROJECT_PATH,
-            inspect_config=None,
-            image_redaction_configs=None,
-            include_findings=None,
-            byte_item=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                inspect_config=None,
+                image_redaction_configs=None,
+                include_findings=None,
+                byte_item=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -935,12 +1028,14 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.reidentify_content.assert_called_once_with(
-            parent=PROJECT_PATH,
-            reidentify_config=None,
-            inspect_config=None,
-            item=None,
-            inspect_template_name=None,
-            reidentify_template_name=None,
+            request=dict(
+                parent=PROJECT_PATH,
+                reidentify_config=None,
+                inspect_config=None,
+                item=None,
+                inspect_template_name=None,
+                reidentify_template_name=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -970,9 +1065,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.update_deidentify_template.assert_called_once_with(
-            name=DEIDENTIFY_TEMPLATE_ORGANIZATION_PATH,
-            deidentify_template=None,
-            update_mask=None,
+            request=dict(
+                name=DEIDENTIFY_TEMPLATE_ORGANIZATION_PATH,
+                deidentify_template=None,
+                update_mask=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -985,9 +1082,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.update_deidentify_template.assert_called_once_with(
-            name=DEIDENTIFY_TEMPLATE_PROJECT_PATH,
-            deidentify_template=None,
-            update_mask=None,
+            request=dict(
+                name=DEIDENTIFY_TEMPLATE_PROJECT_PATH,
+                deidentify_template=None,
+                update_mask=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -1020,9 +1119,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.update_inspect_template.assert_called_once_with(
-            name=INSPECT_TEMPLATE_ORGANIZATION_PATH,
-            inspect_template=None,
-            update_mask=None,
+            request=dict(
+                name=INSPECT_TEMPLATE_ORGANIZATION_PATH,
+                inspect_template=None,
+                update_mask=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -1035,9 +1136,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.update_inspect_template.assert_called_once_with(
-            name=INSPECT_TEMPLATE_PROJECT_PATH,
-            inspect_template=None,
-            update_mask=None,
+            request=dict(
+                name=INSPECT_TEMPLATE_PROJECT_PATH,
+                inspect_template=None,
+                update_mask=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -1102,9 +1205,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.update_stored_info_type.assert_called_once_with(
-            name=STORED_INFO_TYPE_ORGANIZATION_PATH,
-            config=None,
-            update_mask=None,
+            request=dict(
+                name=STORED_INFO_TYPE_ORGANIZATION_PATH,
+                config=None,
+                update_mask=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -1119,9 +1224,11 @@ class TestCloudDLPHook:
 
         assert result is API_RESPONSE
         get_conn.return_value.update_stored_info_type.assert_called_once_with(
-            name=STORED_INFO_TYPE_PROJECT_PATH,
-            config=None,
-            update_mask=None,
+            request=dict(
+                name=STORED_INFO_TYPE_PROJECT_PATH,
+                config=None,
+                update_mask=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
diff --git a/tests/providers/google/cloud/operators/test_dlp.py b/tests/providers/google/cloud/operators/test_dlp.py
index e33638caab..ba98a2381f 100644
--- a/tests/providers/google/cloud/operators/test_dlp.py
+++ b/tests/providers/google/cloud/operators/test_dlp.py
@@ -24,7 +24,18 @@ import unittest
 from unittest import mock
 
 from google.api_core.gapic_v1.method import DEFAULT
-from google.cloud.dlp_v2.types import DeidentifyTemplate, DlpJob, InspectTemplate, JobTrigger, StoredInfoType
+from google.cloud.dlp_v2.types import (
+    DeidentifyContentResponse,
+    DeidentifyTemplate,
+    DlpJob,
+    InspectContentResponse,
+    InspectTemplate,
+    JobTrigger,
+    ListInfoTypesResponse,
+    RedactImageResponse,
+    ReidentifyContentResponse,
+    StoredInfoType,
+)
 
 from airflow.providers.google.cloud.operators.dlp import (
     CloudDLPCancelDLPJobOperator,
@@ -201,7 +212,7 @@ class TestCloudDLPCreateStoredInfoTypeOperator(unittest.TestCase):
 class TestCloudDLPDeidentifyContentOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_deidentify_content(self, mock_hook):
-        mock_hook.return_value.deidentify_content.return_value = mock.MagicMock()
+        mock_hook.return_value.deidentify_content.return_value = DeidentifyContentResponse()
         operator = CloudDLPDeidentifyContentOperator(project_id=PROJECT_ID, task_id="id")
         operator.execute(context=mock.MagicMock())
         mock_hook.assert_called_once_with(
@@ -332,7 +343,7 @@ class TestCloudDLPDeleteStoredInfoTypeOperator(unittest.TestCase):
 class TestCloudDLPGetDeidentifyTemplateOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_get_deidentify_template(self, mock_hook):
-        mock_hook.return_value.get_deidentify_template.return_value = mock.MagicMock()
+        mock_hook.return_value.get_deidentify_template.return_value = DeidentifyTemplate()
         operator = CloudDLPGetDeidentifyTemplateOperator(
             template_id=TEMPLATE_ID, organization_id=ORGANIZATION_ID, task_id="id"
         )
@@ -354,7 +365,7 @@ class TestCloudDLPGetDeidentifyTemplateOperator(unittest.TestCase):
 class TestCloudDLPGetDlpJobOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_get_dlp_job(self, mock_hook):
-        mock_hook.return_value.get_dlp_job.return_value = mock.MagicMock()
+        mock_hook.return_value.get_dlp_job.return_value = DlpJob()
         operator = CloudDLPGetDLPJobOperator(dlp_job_id=DLP_JOB_ID, project_id=PROJECT_ID, task_id="id")
         operator.execute(context=mock.MagicMock())
         mock_hook.assert_called_once_with(
@@ -373,7 +384,7 @@ class TestCloudDLPGetDlpJobOperator(unittest.TestCase):
 class TestCloudDLPGetInspectTemplateOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_get_inspect_template(self, mock_hook):
-        mock_hook.return_value.get_inspect_template.return_value = mock.MagicMock()
+        mock_hook.return_value.get_inspect_template.return_value = InspectTemplate()
         operator = CloudDLPGetInspectTemplateOperator(
             template_id=TEMPLATE_ID, organization_id=ORGANIZATION_ID, task_id="id"
         )
@@ -395,7 +406,7 @@ class TestCloudDLPGetInspectTemplateOperator(unittest.TestCase):
 class TestCloudDLPGetJobTripperOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_get_job_trigger(self, mock_hook):
-        mock_hook.return_value.get_job_trigger.return_value = mock.MagicMock()
+        mock_hook.return_value.get_job_trigger.return_value = JobTrigger()
         operator = CloudDLPGetDLPJobTriggerOperator(
             job_trigger_id=TRIGGER_ID, project_id=PROJECT_ID, task_id="id"
         )
@@ -416,7 +427,7 @@ class TestCloudDLPGetJobTripperOperator(unittest.TestCase):
 class TestCloudDLPGetStoredInfoTypeOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_get_stored_info_type(self, mock_hook):
-        mock_hook.return_value.get_stored_info_type.return_value = mock.MagicMock()
+        mock_hook.return_value.get_stored_info_type.return_value = StoredInfoType()
         operator = CloudDLPGetStoredInfoTypeOperator(
             stored_info_type_id=STORED_INFO_TYPE_ID,
             organization_id=ORGANIZATION_ID,
@@ -441,7 +452,7 @@ class TestCloudDLPInspectContentOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_inspect_content(self, mock_hook):
         inspect_template_name = "inspect_template_name/name"
-        mock_hook.return_value.inspect_content.return_value = mock.MagicMock()
+        mock_hook.return_value.inspect_content.return_value = InspectContentResponse()
         operator = CloudDLPInspectContentOperator(
             project_id=PROJECT_ID, task_id="id", inspect_template_name=inspect_template_name
         )
@@ -507,7 +518,7 @@ class TestCloudDLPListDlpJobsOperator(unittest.TestCase):
 class TestCloudDLPListInfoTypesOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_list_info_types(self, mock_hook):
-        mock_hook.return_value.list_info_types.return_value = mock.MagicMock()
+        mock_hook.return_value.list_info_types.return_value = ListInfoTypesResponse()
         operator = CloudDLPListInfoTypesOperator(task_id="id")
         operator.execute(context=mock.MagicMock())
         mock_hook.assert_called_once_with(
@@ -589,7 +600,7 @@ class TestCloudDLPListStoredInfoTypesOperator(unittest.TestCase):
 class TestCloudDLPRedactImageOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_redact_image(self, mock_hook):
-        mock_hook.return_value.redact_image.return_value = mock.MagicMock()
+        mock_hook.return_value.redact_image.return_value = RedactImageResponse()
         operator = CloudDLPRedactImageOperator(project_id=PROJECT_ID, task_id="id")
         operator.execute(context=None)
         mock_hook.assert_called_once_with(
@@ -611,7 +622,7 @@ class TestCloudDLPRedactImageOperator(unittest.TestCase):
 class TestCloudDLPReidentifyContentOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_reidentify_content(self, mock_hook):
-        mock_hook.return_value.reidentify_content.return_value = mock.MagicMock()
+        mock_hook.return_value.reidentify_content.return_value = ReidentifyContentResponse()
         operator = CloudDLPReidentifyContentOperator(project_id=PROJECT_ID, task_id="id")
         operator.execute(context=None)
         mock_hook.assert_called_once_with(
@@ -634,7 +645,7 @@ class TestCloudDLPReidentifyContentOperator(unittest.TestCase):
 class TestCloudDLPUpdateDeidentifyTemplateOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_update_deidentify_template(self, mock_hook):
-        mock_hook.return_value.update_deidentify_template.return_value = mock.MagicMock()
+        mock_hook.return_value.update_deidentify_template.return_value = DeidentifyTemplate()
         operator = CloudDLPUpdateDeidentifyTemplateOperator(
             template_id=TEMPLATE_ID, organization_id=ORGANIZATION_ID, task_id="id"
         )
@@ -658,7 +669,7 @@ class TestCloudDLPUpdateDeidentifyTemplateOperator(unittest.TestCase):
 class TestCloudDLPUpdateInspectTemplateOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_update_inspect_template(self, mock_hook):
-        mock_hook.return_value.update_inspect_template.return_value = mock.MagicMock()
+        mock_hook.return_value.update_inspect_template.return_value = InspectTemplate()
         operator = CloudDLPUpdateInspectTemplateOperator(
             template_id=TEMPLATE_ID, organization_id=ORGANIZATION_ID, task_id="id"
         )
@@ -682,7 +693,7 @@ class TestCloudDLPUpdateInspectTemplateOperator(unittest.TestCase):
 class TestCloudDLPUpdateJobTriggerOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_update_job_trigger(self, mock_hook):
-        mock_hook.return_value.update_job_trigger.return_value = mock.MagicMock()
+        mock_hook.return_value.update_job_trigger.return_value = JobTrigger()
         operator = CloudDLPUpdateJobTriggerOperator(job_trigger_id=TRIGGER_ID, task_id="id")
         operator.execute(context=mock.MagicMock())
         mock_hook.assert_called_once_with(
@@ -703,7 +714,7 @@ class TestCloudDLPUpdateJobTriggerOperator(unittest.TestCase):
 class TestCloudDLPUpdateStoredInfoTypeOperator(unittest.TestCase):
     @mock.patch("airflow.providers.google.cloud.operators.dlp.CloudDLPHook")
     def test_update_stored_info_type(self, mock_hook):
-        mock_hook.return_value.update_stored_info_type.return_value = mock.MagicMock()
+        mock_hook.return_value.update_stored_info_type.return_value = StoredInfoType()
         operator = CloudDLPUpdateStoredInfoTypeOperator(
             stored_info_type_id=STORED_INFO_TYPE_ID,
             organization_id=ORGANIZATION_ID,