You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/31 03:21:58 UTC

[airflow] branch main updated: Migration of System Tests: Dataplex (AIP-47) (#26989)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 95e5675714 Migration of System Tests: Dataplex (AIP-47) (#26989)
95e5675714 is described below

commit 95e5675714f12c177e30d83a14d28222b06d217b
Author: Beata Kossakowska <10...@users.noreply.github.com>
AuthorDate: Mon Oct 31 04:21:51 2022 +0100

    Migration of System Tests: Dataplex (AIP-47) (#26989)
---
 airflow/providers/google/cloud/hooks/dataplex.py   | 115 ++++++++++-
 airflow/providers/google/cloud/links/dataplex.py   |  27 +++
 .../providers/google/cloud/operators/dataplex.py   | 219 ++++++++++++++++++++-
 airflow/providers/google/provider.yaml             |   1 +
 .../operators/cloud/dataplex.rst                   |  53 ++++-
 .../providers/google/cloud/hooks/test_dataplex.py  |  50 +++++
 .../google/cloud/operators/test_dataplex.py        |  76 +++++++
 .../google/cloud/operators/test_dataplex_system.py |  47 -----
 .../providers/google/cloud/dataplex/__init__.py    |  16 ++
 .../google/cloud/dataplex}/example_dataplex.py     | 119 +++++++++--
 .../google/cloud/dataplex/resources/__init__.py    |  16 ++
 .../cloud/dataplex/resources/spark_example_pi.py   |  43 ++++
 12 files changed, 710 insertions(+), 72 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/dataplex.py b/airflow/providers/google/cloud/hooks/dataplex.py
index 1dbee03b14..e8121b582e 100644
--- a/airflow/providers/google/cloud/hooks/dataplex.py
+++ b/airflow/providers/google/cloud/hooks/dataplex.py
@@ -24,10 +24,11 @@ from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.operation import Operation
 from google.api_core.retry import Retry
 from google.cloud.dataplex_v1 import DataplexServiceClient
-from google.cloud.dataplex_v1.types import Task
+from google.cloud.dataplex_v1.types import Lake, Task
 from googleapiclient.discovery import Resource
 
 from airflow.exceptions import AirflowException
+from airflow.providers.google.common.consts import CLIENT_INFO
 from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
 
 
@@ -70,7 +71,7 @@ class DataplexHook(GoogleBaseHook):
         client_options = ClientOptions(api_endpoint="dataplex.googleapis.com:443")
 
         return DataplexServiceClient(
-            credentials=self.get_credentials(), client_info=self.client_info, client_options=client_options
+            credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options
         )
 
     def wait_for_operation(self, timeout: float | None, operation: Operation):
@@ -248,3 +249,113 @@ class DataplexHook(GoogleBaseHook):
             metadata=metadata,
         )
         return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_lake(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+         Delete the lake resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the lake belongs to.
+        :param lake_id: Required. The ID of the Google Cloud lake to be deleted.
+        :param retry: A retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        name = f"projects/{project_id}/locations/{region}/lakes/{lake_id}"
+
+        client = self.get_dataplex_client()
+        result = client.delete_lake(
+            request={
+                "name": name,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_lake(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        body: dict[str, Any] | Lake,
+        validate_only: bool | None = None,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Creates a lake resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the lake belongs to.
+        :param lake_id: Required. Lake identifier.
+        :param body: Required. The Request body contains an instance of Lake.
+        :param validate_only: Optional. Only validate the request, but do not perform mutations.
+            The default is false.
+        :param retry: A retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        parent = f"projects/{project_id}/locations/{region}"
+        client = self.get_dataplex_client()
+        result = client.create_lake(
+            request={
+                "parent": parent,
+                "lake_id": lake_id,
+                "lake": body,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_lake(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Get lake resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
+        :param region: Required. The ID of the Google Cloud region that the lake belongs to.
+        :param lake_id: Required. The ID of the Google Cloud lake to be retrieved.
+        :param retry: A retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        name = f"projects/{project_id}/locations/{region}/lakes/{lake_id}/"
+        client = self.get_dataplex_client()
+        result = client.get_lake(
+            request={
+                "name": name,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
diff --git a/airflow/providers/google/cloud/links/dataplex.py b/airflow/providers/google/cloud/links/dataplex.py
index e7381c1417..dcf3c87558 100644
--- a/airflow/providers/google/cloud/links/dataplex.py
+++ b/airflow/providers/google/cloud/links/dataplex.py
@@ -29,6 +29,10 @@ DATAPLEX_BASE_LINK = "/dataplex/process/tasks"
 DATAPLEX_TASK_LINK = DATAPLEX_BASE_LINK + "/{lake_id}.{task_id};location={region}/jobs?project={project_id}"
 DATAPLEX_TASKS_LINK = DATAPLEX_BASE_LINK + "?project={project_id}&qLake={lake_id}.{region}"
 
+DATAPLEX_LAKE_LINK = (
+    "https://console.cloud.google.com/dataplex/lakes/{lake_id};location={region}?project={project_id}"
+)
+
 
 class DataplexTaskLink(BaseGoogleLink):
     """Helper class for constructing Dataplex Task link"""
@@ -75,3 +79,26 @@ class DataplexTasksLink(BaseGoogleLink):
                 "region": task_instance.region,
             },
         )
+
+
+class DataplexLakeLink(BaseGoogleLink):
+    """Helper class for constructing Dataplex Lake link"""
+
+    name = "Dataplex Lake"
+    key = "dataplex_lake_key"
+    format_str = DATAPLEX_LAKE_LINK
+
+    @staticmethod
+    def persist(
+        context: Context,
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=DataplexLakeLink.key,
+            value={
+                "lake_id": task_instance.lake_id,
+                "region": task_instance.region,
+                "project_id": task_instance.project_id,
+            },
+        )
diff --git a/airflow/providers/google/cloud/operators/dataplex.py b/airflow/providers/google/cloud/operators/dataplex.py
index e212a134bf..12c3917690 100644
--- a/airflow/providers/google/cloud/operators/dataplex.py
+++ b/airflow/providers/google/cloud/operators/dataplex.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """This module contains Google Dataplex operators."""
+
 from __future__ import annotations
 
 from time import sleep
@@ -25,12 +26,16 @@ if TYPE_CHECKING:
 
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.retry import Retry, exponential_sleep_generator
-from google.cloud.dataplex_v1.types import Task
+from google.cloud.dataplex_v1.types import Lake, Task
 from googleapiclient.errors import HttpError
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
-from airflow.providers.google.cloud.links.dataplex import DataplexTaskLink, DataplexTasksLink
+from airflow.providers.google.cloud.links.dataplex import (
+    DataplexLakeLink,
+    DataplexTaskLink,
+    DataplexTasksLink,
+)
 
 
 class DataplexCreateTaskOperator(BaseOperator):
@@ -427,4 +432,214 @@ class DataplexGetTaskOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+        DataplexTasksLink.persist(context=context, task_instance=self)
         return Task.to_dict(task)
+
+
+class DataplexCreateLakeOperator(BaseOperator):
+    """
+    Creates a lake resource within a lake.
+
+    :param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
+    :param region: Required. The ID of the Google Cloud region that the lake belongs to.
+    :param lake_id: Required. Lake identifier.
+    :param body:  Required. The Request body contains an instance of Lake.
+    :param validate_only: Optional. Only validate the request, but do not perform mutations. The default is
+        false.
+    :param api_version: The version of the api that will be requested for example 'v1'.
+    :param retry: A retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+        request must have domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :param asynchronous: Flag informing should the Dataplex lake be created asynchronously.
+        This is useful for long running creating lakes and
+        waiting on them asynchronously using the DataplexLakeSensor
+    """
+
+    template_fields = (
+        "project_id",
+        "lake_id",
+        "body",
+        "validate_only",
+        "delegate_to",
+        "impersonation_chain",
+    )
+    template_fields_renderers = {"body": "json"}
+    operator_extra_links = (DataplexLakeLink(),)
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        body: dict[str, Any],
+        validate_only: bool | None = None,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        asynchronous: bool = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.body = body
+        self.validate_only = validate_only
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        self.asynchronous = asynchronous
+
+    def execute(self, context: Context) -> dict:
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Creating Dataplex lake %s", self.lake_id)
+
+        try:
+            operation = hook.create_lake(
+                project_id=self.project_id,
+                region=self.region,
+                lake_id=self.lake_id,
+                body=self.body,
+                validate_only=self.validate_only,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            if not self.asynchronous:
+                self.log.info("Waiting for Dataplex lake %s to be created", self.lake_id)
+                lake = hook.wait_for_operation(timeout=self.timeout, operation=operation)
+                self.log.info("Lake %s created successfully", self.lake_id)
+            else:
+                is_done = operation.done()
+                self.log.info("Is operation done already? %s", is_done)
+                return is_done
+        except HttpError as err:
+            if err.resp.status not in (409, "409"):
+                raise
+            self.log.info("Lake %s already exists", self.lake_id)
+            # Wait for lake to be ready
+            for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
+                lake = hook.get_lake(
+                    project_id=self.project_id,
+                    region=self.region,
+                    lake_id=self.lake_id,
+                    retry=self.retry,
+                    timeout=self.timeout,
+                    metadata=self.metadata,
+                )
+                if lake["state"] != "CREATING":
+                    break
+                sleep(time_to_wait)
+        DataplexLakeLink.persist(
+            context=context,
+            task_instance=self,
+        )
+        return Lake.to_dict(lake)
+
+
+class DataplexDeleteLakeOperator(BaseOperator):
+    """
+    Delete the lake resource.
+
+    :param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
+    :param region: Required. The ID of the Google Cloud region that the lake belongs to.
+    :param lake_id: Required. Lake identifier.
+    :param api_version: The version of the api that will be requested for example 'v1'.
+    :param retry: A retry object used  to retry requests. If `None` is specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to complete.
+        Note that if `retry` is specified, the timeout applies to each individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+        request must have domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    """
+
+    template_fields = ("project_id", "lake_id", "delegate_to", "impersonation_chain")
+    operator_extra_links = (DataplexLakeLink(),)
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        api_version: str = "v1",
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        *args,
+        **kwargs,
+    ) -> None:
+
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context) -> None:
+
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        self.log.info("Deleting Dataplex lake %s", self.lake_id)
+
+        operation = hook.delete_lake(
+            project_id=self.project_id,
+            region=self.region,
+            lake_id=self.lake_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        DataplexLakeLink.persist(context=context, task_instance=self)
+        hook.wait_for_operation(timeout=self.timeout, operation=operation)
+        self.log.info("Dataplex lake %s deleted successfully!", self.lake_id)
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index fb0a3d434c..10c47c7d54 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -964,6 +964,7 @@ extra-links:
   - airflow.providers.google.cloud.links.cloud_sql.CloudSQLInstanceDatabaseLink
   - airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
   - airflow.providers.google.cloud.links.dataplex.DataplexTasksLink
+  - airflow.providers.google.cloud.links.dataplex.DataplexLakeLink
   - airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink
   - airflow.providers.google.cloud.links.bigquery.BigQueryTableLink
   - airflow.providers.google.cloud.links.bigquery_dts.BigQueryDataTransferConfigLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
index 00b1949a0a..c6ed5db7da 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
@@ -31,7 +31,7 @@ For more information about the available fields to pass when creating a task, vi
 
 A simple task configuration can look as followed:
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 0
     :start-after: [START howto_dataplex_configuration]
@@ -40,13 +40,13 @@ A simple task configuration can look as followed:
 With this configuration we can create the task both synchronously & asynchronously:
 :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateTaskOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
     :start-after: [START howto_dataplex_create_task_operator]
     :end-before: [END howto_dataplex_create_task_operator]
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
     :start-after: [START howto_dataplex_async_create_task_operator]
@@ -59,7 +59,7 @@ To delete a task you can use:
 
 :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteTaskOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
     :start-after: [START howto_dataplex_delete_task_operator]
@@ -72,7 +72,7 @@ To list tasks you can use:
 
 :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexListTasksOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
     :start-after: [START howto_dataplex_list_tasks_operator]
@@ -85,7 +85,7 @@ To get a task you can use:
 
 :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetTaskOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
     :start-after: [START howto_dataplex_get_task_operator]
@@ -98,8 +98,47 @@ To wait for a task created asynchronously you can use:
 
 :class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexTaskStateSensor`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
     :language: python
     :dedent: 4
     :start-after: [START howto_dataplex_task_state_sensor]
     :end-before: [END howto_dataplex_task_state_sensor]
+
+Create a Lake
+-------------
+
+Before you create a dataplex lake you need to define its body.
+
+For more information about the available fields to pass when creating a lake, visit `Dataplex create lake API. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes#Lake>`__
+
+A simple task configuration can look as followed:
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_dataplex_lake_configuration]
+    :end-before: [END howto_dataplex_lake_configuration]
+
+With this configuration we can create the lake:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateLakeOperator`
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_create_lake_operator]
+    :end-before: [END howto_dataplex_create_lake_operator]
+
+
+Delete a lake
+-------------
+
+To delete a lake you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteLakeOperator`
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_delete_lake_operator]
+    :end-before: [END howto_dataplex_delete_lake_operator]
diff --git a/tests/providers/google/cloud/hooks/test_dataplex.py b/tests/providers/google/cloud/hooks/test_dataplex.py
index 562b60a0e4..9a11c01bad 100644
--- a/tests/providers/google/cloud/hooks/test_dataplex.py
+++ b/tests/providers/google/cloud/hooks/test_dataplex.py
@@ -121,3 +121,53 @@ class TestDataplexHook(TestCase):
             timeout=None,
             metadata=(),
         )
+
+    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    def test_create_lake(self, mock_client):
+        self.hook.create_lake(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            body=BODY,
+            validate_only=None,
+        )
+
+        parent = f"projects/{PROJECT_ID}/locations/{REGION}"
+        mock_client.return_value.create_lake.assert_called_once_with(
+            request=dict(
+                parent=parent,
+                lake_id=LAKE_ID,
+                lake=BODY,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    def test_delete_lake(self, mock_client):
+        self.hook.delete_lake(project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID)
+
+        name = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}"
+        mock_client.return_value.delete_lake.assert_called_once_with(
+            request=dict(
+                name=name,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    def test_get_lake(self, mock_client):
+        self.hook.get_lake(project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID)
+
+        name = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/"
+        mock_client.return_value.get_lake.assert_called_once_with(
+            request=dict(
+                name=name,
+            ),
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
diff --git a/tests/providers/google/cloud/operators/test_dataplex.py b/tests/providers/google/cloud/operators/test_dataplex.py
index 8ecd396780..77115e2066 100644
--- a/tests/providers/google/cloud/operators/test_dataplex.py
+++ b/tests/providers/google/cloud/operators/test_dataplex.py
@@ -21,7 +21,9 @@ from unittest import TestCase, mock
 from google.api_core.gapic_v1.method import DEFAULT
 
 from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCreateLakeOperator,
     DataplexCreateTaskOperator,
+    DataplexDeleteLakeOperator,
     DataplexDeleteTaskOperator,
     DataplexGetTaskOperator,
     DataplexListTasksOperator,
@@ -29,11 +31,18 @@ from airflow.providers.google.cloud.operators.dataplex import (
 
 HOOK_STR = "airflow.providers.google.cloud.operators.dataplex.DataplexHook"
 TASK_STR = "airflow.providers.google.cloud.operators.dataplex.Task"
+LAKE_STR = "airflow.providers.google.cloud.operators.dataplex.Lake"
 
 PROJECT_ID = "project-id"
 REGION = "region"
 LAKE_ID = "lake-id"
 BODY = {"body": "test"}
+BODY_LAKE = {
+    "display_name": "test_display_name",
+    "labels": [],
+    "description": "test_description",
+    "metastore": {"service": ""},
+}
 DATAPLEX_TASK_ID = "testTask001"
 
 GCP_CONN_ID = "google_cloud_default"
@@ -180,3 +189,70 @@ class TestDataplexGetTaskOperator(TestCase):
             timeout=None,
             metadata=(),
         )
+
+
+class TestDataplexDeleteLakeOperator(TestCase):
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock):
+        op = DataplexDeleteLakeOperator(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            task_id="delete_dataplex_lake",
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.delete_lake.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexCreateLakeOperator(TestCase):
+    @mock.patch(HOOK_STR)
+    @mock.patch(LAKE_STR)
+    def test_execute(self, lake_mock, hook_mock):
+        op = DataplexCreateLakeOperator(
+            task_id="create_dataplex_lake",
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            body=BODY_LAKE,
+            validate_only=None,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.wait_for_operation.return_value = None
+        lake_mock.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.create_lake.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            body=BODY_LAKE,
+            validate_only=None,
+            retry=DEFAULT,
+            timeout=None,
+            metadata=(),
+        )
diff --git a/tests/providers/google/cloud/operators/test_dataplex_system.py b/tests/providers/google/cloud/operators/test_dataplex_system.py
deleted file mode 100644
index b3b9c709d4..0000000000
--- a/tests/providers/google/cloud/operators/test_dataplex_system.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_dataplex import BUCKET, SPARK_FILE_NAME
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-GCS_URI = f"gs://{BUCKET}"
-
-spark_file = """
-#!/usr/bin/python
-print("### Hello, dataplex! ###")
-"""
-
-
-@pytest.mark.backend("mysql", "postgres")
-@pytest.mark.credential_file(GCP_GCS_KEY)
-class DataplexExampleDagsTest(GoogleSystemTest):
-    def setUp(self):
-        super().setUp()
-        self.create_gcs_bucket(BUCKET)
-        self.upload_content_to_gcs(lines=spark_file, bucket=GCS_URI, filename=SPARK_FILE_NAME)
-
-    def tearDown(self):
-        self.delete_gcs_bucket(BUCKET)
-        super().tearDown()
-
-    @provide_gcp_context(GCP_GCS_KEY)
-    def test_run_example_dag(self):
-        self.run_dag(dag_id="example_dataplex", dag_folder=CLOUD_DAG_FOLDER)
diff --git a/tests/system/providers/google/cloud/dataplex/__init__.py b/tests/system/providers/google/cloud/dataplex/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataplex/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/google/cloud/example_dags/example_dataplex.py b/tests/system/providers/google/cloud/dataplex/example_dataplex.py
similarity index 53%
rename from airflow/providers/google/cloud/example_dags/example_dataplex.py
rename to tests/system/providers/google/cloud/dataplex/example_dataplex.py
index b12780176a..bc2b6462b8 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataplex.py
+++ b/tests/system/providers/google/cloud/dataplex/example_dataplex.py
@@ -21,25 +21,41 @@ from __future__ import annotations
 
 import datetime
 import os
+from pathlib import Path
 
 from airflow import models
 from airflow.models.baseoperator import chain
 from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCreateLakeOperator,
     DataplexCreateTaskOperator,
+    DataplexDeleteLakeOperator,
     DataplexDeleteTaskOperator,
     DataplexGetTaskOperator,
     DataplexListTasksOperator,
 )
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.sensors.dataplex import DataplexTaskStateSensor
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "INVALID PROJECT ID")
-REGION = os.environ.get("GCP_REGION", "INVALID REGION")
-LAKE_ID = os.environ.get("GCP_LAKE_ID", "INVALID LAKE ID")
-SERVICE_ACC = os.environ.get("GCP_DATAPLEX_SERVICE_ACC", "XYZ@developer.gserviceaccount.com")
-BUCKET = os.environ.get("GCP_DATAPLEX_BUCKET", "INVALID BUCKET NAME")
-SPARK_FILE_NAME = os.environ.get("SPARK_FILE_NAME", "INVALID FILE NAME")
-SPARK_FILE_FULL_PATH = f"gs://{BUCKET}/{SPARK_FILE_NAME}"
-DATAPLEX_TASK_ID = "task001"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "example_dataplex"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+SPARK_FILE_NAME = "spark_example_pi.py"
+CURRENT_FOLDER = Path(__file__).parent
+FILE_LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources" / SPARK_FILE_NAME)
+
+LAKE_ID = f"test-lake-{ENV_ID}"
+REGION = "us-central1"
+
+SERVICE_ACC = os.environ.get("GCP_DATAPLEX_SERVICE_ACC")
+
+SPARK_FILE_FULL_PATH = f"gs://{BUCKET_NAME}/{SPARK_FILE_NAME}"
+DATAPLEX_TASK_ID = f"test-task-{ENV_ID}"
 TRIGGER_SPEC_TYPE = "ON_DEMAND"
 
 # [START howto_dataplex_configuration]
@@ -50,10 +66,39 @@ EXAMPLE_TASK_BODY = {
 }
 # [END howto_dataplex_configuration]
 
+# [START howto_dataplex_lake_configuration]
+EXAMPLE_LAKE_BODY = {
+    "display_name": "test_display_name",
+    "labels": [],
+    "description": "test_description",
+    "metastore": {"service": ""},
+}
+# [END howto_dataplex_lake_configuration]
+
+
 with models.DAG(
-    "example_dataplex",
+    DAG_ID,
     start_date=datetime.datetime(2021, 1, 1),
+    schedule="@once",
+    tags=["example", "dataplex"],
 ) as dag:
+
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+    )
+
+    upload_file = LocalFilesystemToGCSOperator(
+        task_id="upload_file",
+        src=FILE_LOCAL_PATH,
+        dst=SPARK_FILE_NAME,
+        bucket=BUCKET_NAME,
+    )
+    # [START howto_dataplex_create_lake_operator]
+    create_lake = DataplexCreateLakeOperator(
+        project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
+    )
+    # [END howto_dataplex_create_lake_operator]
+
     # [START howto_dataplex_create_task_operator]
     create_dataplex_task = DataplexCreateTaskOperator(
         project_id=PROJECT_ID,
@@ -71,19 +116,19 @@ with models.DAG(
         region=REGION,
         lake_id=LAKE_ID,
         body=EXAMPLE_TASK_BODY,
-        dataplex_task_id=DATAPLEX_TASK_ID,
+        dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
         asynchronous=True,
         task_id="create_dataplex_task_async",
     )
     # [END howto_dataplex_async_create_task_operator]
 
     # [START howto_dataplex_delete_task_operator]
-    delete_dataplex_task = DataplexDeleteTaskOperator(
+    delete_dataplex_task_async = DataplexDeleteTaskOperator(
         project_id=PROJECT_ID,
         region=REGION,
         lake_id=LAKE_ID,
-        dataplex_task_id=DATAPLEX_TASK_ID,
-        task_id="delete_dataplex_task",
+        dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
+        task_id="delete_dataplex_task_async",
     )
     # [END howto_dataplex_delete_task_operator]
 
@@ -113,11 +158,57 @@ with models.DAG(
     )
     # [END howto_dataplex_task_state_sensor]
 
+    # [START howto_dataplex_delete_task_operator]
+    delete_dataplex_task = DataplexDeleteTaskOperator(
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        dataplex_task_id=DATAPLEX_TASK_ID,
+        task_id="delete_dataplex_task",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END howto_dataplex_delete_task_operator]
+
+    # [START howto_dataplex_delete_lake_operator]
+    delete_lake = DataplexDeleteLakeOperator(
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        task_id="delete_lake",
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+    # [END howto_dataplex_delete_lake_operator]
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+    )
+
     chain(
+        # TEST SETUP
+        create_bucket,
+        upload_file,
+        # TEST BODY
+        create_lake,
         create_dataplex_task,
         get_dataplex_task,
         list_dataplex_task,
-        delete_dataplex_task,
         create_dataplex_task_async,
+        delete_dataplex_task_async,
         dataplex_task_state,
+        # TEST TEARDOWN
+        delete_dataplex_task,
+        delete_lake,
+        delete_bucket,
     )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/dataplex/resources/__init__.py b/tests/system/providers/google/cloud/dataplex/resources/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataplex/resources/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/dataplex/resources/spark_example_pi.py b/tests/system/providers/google/cloud/dataplex/resources/spark_example_pi.py
new file mode 100644
index 0000000000..77a7cb9e6c
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataplex/resources/spark_example_pi.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import sys
+from operator import add
+from random import random
+
+from pyspark.sql import SparkSession
+
+if __name__ == "__main__":
+    """
+    Usage: pi [partitions]
+    """
+    spark = SparkSession.builder.appName("PythonPi").getOrCreate()
+
+    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
+    n = 100000 * partitions
+
+    def f(_: int) -> float:
+        x = random() * 2 - 1
+        y = random() * 2 - 1
+        return 1 if x**2 + y**2 <= 1 else 0
+
+    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+    print(f"Pi is roughly {4.0 * count / n:f}")
+
+    spark.stop()