You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/31 03:21:58 UTC
[airflow] branch main updated: Migration of System Tests: Dataplex (AIP-47) (#26989)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 95e5675714 Migration of System Tests: Dataplex (AIP-47) (#26989)
95e5675714 is described below
commit 95e5675714f12c177e30d83a14d28222b06d217b
Author: Beata Kossakowska <10...@users.noreply.github.com>
AuthorDate: Mon Oct 31 04:21:51 2022 +0100
Migration of System Tests: Dataplex (AIP-47) (#26989)
---
airflow/providers/google/cloud/hooks/dataplex.py | 115 ++++++++++-
airflow/providers/google/cloud/links/dataplex.py | 27 +++
.../providers/google/cloud/operators/dataplex.py | 219 ++++++++++++++++++++-
airflow/providers/google/provider.yaml | 1 +
.../operators/cloud/dataplex.rst | 53 ++++-
.../providers/google/cloud/hooks/test_dataplex.py | 50 +++++
.../google/cloud/operators/test_dataplex.py | 76 +++++++
.../google/cloud/operators/test_dataplex_system.py | 47 -----
.../providers/google/cloud/dataplex/__init__.py | 16 ++
.../google/cloud/dataplex}/example_dataplex.py | 119 +++++++++--
.../google/cloud/dataplex/resources/__init__.py | 16 ++
.../cloud/dataplex/resources/spark_example_pi.py | 43 ++++
12 files changed, 710 insertions(+), 72 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/dataplex.py b/airflow/providers/google/cloud/hooks/dataplex.py
index 1dbee03b14..e8121b582e 100644
--- a/airflow/providers/google/cloud/hooks/dataplex.py
+++ b/airflow/providers/google/cloud/hooks/dataplex.py
@@ -24,10 +24,11 @@ from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.cloud.dataplex_v1 import DataplexServiceClient
-from google.cloud.dataplex_v1.types import Task
+from google.cloud.dataplex_v1.types import Lake, Task
from googleapiclient.discovery import Resource
from airflow.exceptions import AirflowException
+from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
@@ -70,7 +71,7 @@ class DataplexHook(GoogleBaseHook):
client_options = ClientOptions(api_endpoint="dataplex.googleapis.com:443")
return DataplexServiceClient(
- credentials=self.get_credentials(), client_info=self.client_info, client_options=client_options
+ credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options
)
def wait_for_operation(self, timeout: float | None, operation: Operation):
@@ -248,3 +249,113 @@ class DataplexHook(GoogleBaseHook):
metadata=metadata,
)
return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_lake(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Delete the lake resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake to be deleted.
+ :param retry: A retry object used to retry requests. If `None` is specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to complete.
+ Note that if `retry` is specified, the timeout applies to each individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ name = f"projects/{project_id}/locations/{region}/lakes/{lake_id}"
+
+ client = self.get_dataplex_client()
+ result = client.delete_lake(
+ request={
+ "name": name,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_lake(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ body: dict[str, Any] | Lake,
+ validate_only: bool | None = None,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Creates a lake resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake belongs to.
+ :param lake_id: Required. Lake identifier.
+ :param body: Required. The Request body contains an instance of Lake.
+ :param validate_only: Optional. Only validate the request, but do not perform mutations.
+ The default is false.
+ :param retry: A retry object used to retry requests. If `None` is specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to complete.
+ Note that if `retry` is specified, the timeout applies to each individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ parent = f"projects/{project_id}/locations/{region}"
+ client = self.get_dataplex_client()
+ result = client.create_lake(
+ request={
+ "parent": parent,
+ "lake_id": lake_id,
+ "lake": body,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_lake(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Get lake resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake to be retrieved.
+ :param retry: A retry object used to retry requests. If `None` is specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to complete.
+ Note that if `retry` is specified, the timeout applies to each individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ name = f"projects/{project_id}/locations/{region}/lakes/{lake_id}/"
+ client = self.get_dataplex_client()
+ result = client.get_lake(
+ request={
+ "name": name,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
diff --git a/airflow/providers/google/cloud/links/dataplex.py b/airflow/providers/google/cloud/links/dataplex.py
index e7381c1417..dcf3c87558 100644
--- a/airflow/providers/google/cloud/links/dataplex.py
+++ b/airflow/providers/google/cloud/links/dataplex.py
@@ -29,6 +29,10 @@ DATAPLEX_BASE_LINK = "/dataplex/process/tasks"
DATAPLEX_TASK_LINK = DATAPLEX_BASE_LINK + "/{lake_id}.{task_id};location={region}/jobs?project={project_id}"
DATAPLEX_TASKS_LINK = DATAPLEX_BASE_LINK + "?project={project_id}&qLake={lake_id}.{region}"
+DATAPLEX_LAKE_LINK = (
+ "https://console.cloud.google.com/dataplex/lakes/{lake_id};location={region}?project={project_id}"
+)
+
class DataplexTaskLink(BaseGoogleLink):
"""Helper class for constructing Dataplex Task link"""
@@ -75,3 +79,26 @@ class DataplexTasksLink(BaseGoogleLink):
"region": task_instance.region,
},
)
+
+
+class DataplexLakeLink(BaseGoogleLink):
+ """Helper class for constructing Dataplex Lake link"""
+
+ name = "Dataplex Lake"
+ key = "dataplex_lake_key"
+ format_str = DATAPLEX_LAKE_LINK
+
+ @staticmethod
+ def persist(
+ context: Context,
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=DataplexLakeLink.key,
+ value={
+ "lake_id": task_instance.lake_id,
+ "region": task_instance.region,
+ "project_id": task_instance.project_id,
+ },
+ )
diff --git a/airflow/providers/google/cloud/operators/dataplex.py b/airflow/providers/google/cloud/operators/dataplex.py
index e212a134bf..12c3917690 100644
--- a/airflow/providers/google/cloud/operators/dataplex.py
+++ b/airflow/providers/google/cloud/operators/dataplex.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Dataplex operators."""
+
from __future__ import annotations
from time import sleep
@@ -25,12 +26,16 @@ if TYPE_CHECKING:
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry, exponential_sleep_generator
-from google.cloud.dataplex_v1.types import Task
+from google.cloud.dataplex_v1.types import Lake, Task
from googleapiclient.errors import HttpError
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
-from airflow.providers.google.cloud.links.dataplex import DataplexTaskLink, DataplexTasksLink
+from airflow.providers.google.cloud.links.dataplex import (
+ DataplexLakeLink,
+ DataplexTaskLink,
+ DataplexTasksLink,
+)
class DataplexCreateTaskOperator(BaseOperator):
@@ -427,4 +432,214 @@ class DataplexGetTaskOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ DataplexTasksLink.persist(context=context, task_instance=self)
return Task.to_dict(task)
+
+
+class DataplexCreateLakeOperator(BaseOperator):
+ """
+ Creates a lake resource within a lake.
+
+ :param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake belongs to.
+ :param lake_id: Required. Lake identifier.
+ :param body: Required. The Request body contains an instance of Lake.
+ :param validate_only: Optional. Only validate the request, but do not perform mutations. The default is
+ false.
+ :param api_version: The version of the api that will be requested for example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to complete.
+ Note that if `retry` is specified, the timeout applies to each individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :param asynchronous: Flag informing should the Dataplex lake be created asynchronously.
+ This is useful for long running creating lakes and
+ waiting on them asynchronously using the DataplexLakeSensor
+ """
+
+ template_fields = (
+ "project_id",
+ "lake_id",
+ "body",
+ "validate_only",
+ "delegate_to",
+ "impersonation_chain",
+ )
+ template_fields_renderers = {"body": "json"}
+ operator_extra_links = (DataplexLakeLink(),)
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ body: dict[str, Any],
+ validate_only: bool | None = None,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ asynchronous: bool = False,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.body = body
+ self.validate_only = validate_only
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+ self.asynchronous = asynchronous
+
+ def execute(self, context: Context) -> dict:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Creating Dataplex lake %s", self.lake_id)
+
+ try:
+ operation = hook.create_lake(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ body=self.body,
+ validate_only=self.validate_only,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ if not self.asynchronous:
+ self.log.info("Waiting for Dataplex lake %s to be created", self.lake_id)
+ lake = hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Lake %s created successfully", self.lake_id)
+ else:
+ is_done = operation.done()
+ self.log.info("Is operation done already? %s", is_done)
+ return is_done
+ except HttpError as err:
+ if err.resp.status not in (409, "409"):
+ raise
+ self.log.info("Lake %s already exists", self.lake_id)
+ # Wait for lake to be ready
+ for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
+ lake = hook.get_lake(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ if lake["state"] != "CREATING":
+ break
+ sleep(time_to_wait)
+ DataplexLakeLink.persist(
+ context=context,
+ task_instance=self,
+ )
+ return Lake.to_dict(lake)
+
+
+class DataplexDeleteLakeOperator(BaseOperator):
+ """
+ Delete the lake resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
+ :param region: Required. The ID of the Google Cloud region that the lake belongs to.
+ :param lake_id: Required. Lake identifier.
+ :param api_version: The version of the api that will be requested for example 'v1'.
+ :param retry: A retry object used to retry requests. If `None` is specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to complete.
+ Note that if `retry` is specified, the timeout applies to each individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ """
+
+ template_fields = ("project_id", "lake_id", "delegate_to", "impersonation_chain")
+ operator_extra_links = (DataplexLakeLink(),)
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ api_version: str = "v1",
+ retry: Retry | _MethodDefault = DEFAULT,
+ timeout: float | None = None,
+ metadata: Sequence[tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ *args,
+ **kwargs,
+ ) -> None:
+
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context) -> None:
+
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ self.log.info("Deleting Dataplex lake %s", self.lake_id)
+
+ operation = hook.delete_lake(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ DataplexLakeLink.persist(context=context, task_instance=self)
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex lake %s deleted successfully!", self.lake_id)
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index fb0a3d434c..10c47c7d54 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -964,6 +964,7 @@ extra-links:
- airflow.providers.google.cloud.links.cloud_sql.CloudSQLInstanceDatabaseLink
- airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
- airflow.providers.google.cloud.links.dataplex.DataplexTasksLink
+ - airflow.providers.google.cloud.links.dataplex.DataplexLakeLink
- airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink
- airflow.providers.google.cloud.links.bigquery.BigQueryTableLink
- airflow.providers.google.cloud.links.bigquery_dts.BigQueryDataTransferConfigLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
index 00b1949a0a..c6ed5db7da 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
@@ -31,7 +31,7 @@ For more information about the available fields to pass when creating a task, vi
A simple task configuration can look as followed:
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
:language: python
:dedent: 0
:start-after: [START howto_dataplex_configuration]
@@ -40,13 +40,13 @@ A simple task configuration can look as followed:
With this configuration we can create the task both synchronously & asynchronously:
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateTaskOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_create_task_operator]
:end-before: [END howto_dataplex_create_task_operator]
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_async_create_task_operator]
@@ -59,7 +59,7 @@ To delete a task you can use:
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteTaskOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_delete_task_operator]
@@ -72,7 +72,7 @@ To list tasks you can use:
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexListTasksOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_list_tasks_operator]
@@ -85,7 +85,7 @@ To get a task you can use:
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetTaskOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_get_task_operator]
@@ -98,8 +98,47 @@ To wait for a task created asynchronously you can use:
:class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexTaskStateSensor`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
:language: python
:dedent: 4
:start-after: [START howto_dataplex_task_state_sensor]
:end-before: [END howto_dataplex_task_state_sensor]
+
+Create a Lake
+-------------
+
+Before you create a dataplex lake you need to define its body.
+
+For more information about the available fields to pass when creating a lake, visit `Dataplex create lake API. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes#Lake>`__
+
+A simple task configuration can look as followed:
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_dataplex_lake_configuration]
+ :end-before: [END howto_dataplex_lake_configuration]
+
+With this configuration we can create the lake:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateLakeOperator`
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_create_lake_operator]
+ :end-before: [END howto_dataplex_create_lake_operator]
+
+
+Delete a lake
+-------------
+
+To delete a lake you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteLakeOperator`
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_delete_lake_operator]
+ :end-before: [END howto_dataplex_delete_lake_operator]
diff --git a/tests/providers/google/cloud/hooks/test_dataplex.py b/tests/providers/google/cloud/hooks/test_dataplex.py
index 562b60a0e4..9a11c01bad 100644
--- a/tests/providers/google/cloud/hooks/test_dataplex.py
+++ b/tests/providers/google/cloud/hooks/test_dataplex.py
@@ -121,3 +121,53 @@ class TestDataplexHook(TestCase):
timeout=None,
metadata=(),
)
+
+ @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ def test_create_lake(self, mock_client):
+ self.hook.create_lake(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ body=BODY,
+ validate_only=None,
+ )
+
+ parent = f"projects/{PROJECT_ID}/locations/{REGION}"
+ mock_client.return_value.create_lake.assert_called_once_with(
+ request=dict(
+ parent=parent,
+ lake_id=LAKE_ID,
+ lake=BODY,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ def test_delete_lake(self, mock_client):
+ self.hook.delete_lake(project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID)
+
+ name = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}"
+ mock_client.return_value.delete_lake.assert_called_once_with(
+ request=dict(
+ name=name,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ def test_get_lake(self, mock_client):
+ self.hook.get_lake(project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID)
+
+ name = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/"
+ mock_client.return_value.get_lake.assert_called_once_with(
+ request=dict(
+ name=name,
+ ),
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
diff --git a/tests/providers/google/cloud/operators/test_dataplex.py b/tests/providers/google/cloud/operators/test_dataplex.py
index 8ecd396780..77115e2066 100644
--- a/tests/providers/google/cloud/operators/test_dataplex.py
+++ b/tests/providers/google/cloud/operators/test_dataplex.py
@@ -21,7 +21,9 @@ from unittest import TestCase, mock
from google.api_core.gapic_v1.method import DEFAULT
from airflow.providers.google.cloud.operators.dataplex import (
+ DataplexCreateLakeOperator,
DataplexCreateTaskOperator,
+ DataplexDeleteLakeOperator,
DataplexDeleteTaskOperator,
DataplexGetTaskOperator,
DataplexListTasksOperator,
@@ -29,11 +31,18 @@ from airflow.providers.google.cloud.operators.dataplex import (
HOOK_STR = "airflow.providers.google.cloud.operators.dataplex.DataplexHook"
TASK_STR = "airflow.providers.google.cloud.operators.dataplex.Task"
+LAKE_STR = "airflow.providers.google.cloud.operators.dataplex.Lake"
PROJECT_ID = "project-id"
REGION = "region"
LAKE_ID = "lake-id"
BODY = {"body": "test"}
+BODY_LAKE = {
+ "display_name": "test_display_name",
+ "labels": [],
+ "description": "test_description",
+ "metastore": {"service": ""},
+}
DATAPLEX_TASK_ID = "testTask001"
GCP_CONN_ID = "google_cloud_default"
@@ -180,3 +189,70 @@ class TestDataplexGetTaskOperator(TestCase):
timeout=None,
metadata=(),
)
+
+
+class TestDataplexDeleteLakeOperator(TestCase):
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock):
+ op = DataplexDeleteLakeOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ task_id="delete_dataplex_lake",
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.delete_lake.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexCreateLakeOperator(TestCase):
+ @mock.patch(HOOK_STR)
+ @mock.patch(LAKE_STR)
+ def test_execute(self, lake_mock, hook_mock):
+ op = DataplexCreateLakeOperator(
+ task_id="create_dataplex_lake",
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ body=BODY_LAKE,
+ validate_only=None,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.wait_for_operation.return_value = None
+ lake_mock.return_value.to_dict.return_value = None
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.create_lake.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ body=BODY_LAKE,
+ validate_only=None,
+ retry=DEFAULT,
+ timeout=None,
+ metadata=(),
+ )
diff --git a/tests/providers/google/cloud/operators/test_dataplex_system.py b/tests/providers/google/cloud/operators/test_dataplex_system.py
deleted file mode 100644
index b3b9c709d4..0000000000
--- a/tests/providers/google/cloud/operators/test_dataplex_system.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import pytest
-
-from airflow.providers.google.cloud.example_dags.example_dataplex import BUCKET, SPARK_FILE_NAME
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-GCS_URI = f"gs://{BUCKET}"
-
-spark_file = """
-#!/usr/bin/python
-print("### Hello, dataplex! ###")
-"""
-
-
-@pytest.mark.backend("mysql", "postgres")
-@pytest.mark.credential_file(GCP_GCS_KEY)
-class DataplexExampleDagsTest(GoogleSystemTest):
- def setUp(self):
- super().setUp()
- self.create_gcs_bucket(BUCKET)
- self.upload_content_to_gcs(lines=spark_file, bucket=GCS_URI, filename=SPARK_FILE_NAME)
-
- def tearDown(self):
- self.delete_gcs_bucket(BUCKET)
- super().tearDown()
-
- @provide_gcp_context(GCP_GCS_KEY)
- def test_run_example_dag(self):
- self.run_dag(dag_id="example_dataplex", dag_folder=CLOUD_DAG_FOLDER)
diff --git a/tests/system/providers/google/cloud/dataplex/__init__.py b/tests/system/providers/google/cloud/dataplex/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataplex/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/google/cloud/example_dags/example_dataplex.py b/tests/system/providers/google/cloud/dataplex/example_dataplex.py
similarity index 53%
rename from airflow/providers/google/cloud/example_dags/example_dataplex.py
rename to tests/system/providers/google/cloud/dataplex/example_dataplex.py
index b12780176a..bc2b6462b8 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataplex.py
+++ b/tests/system/providers/google/cloud/dataplex/example_dataplex.py
@@ -21,25 +21,41 @@ from __future__ import annotations
import datetime
import os
+from pathlib import Path
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataplex import (
+ DataplexCreateLakeOperator,
DataplexCreateTaskOperator,
+ DataplexDeleteLakeOperator,
DataplexDeleteTaskOperator,
DataplexGetTaskOperator,
DataplexListTasksOperator,
)
+from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.sensors.dataplex import DataplexTaskStateSensor
+from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
+from airflow.utils.trigger_rule import TriggerRule
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "INVALID PROJECT ID")
-REGION = os.environ.get("GCP_REGION", "INVALID REGION")
-LAKE_ID = os.environ.get("GCP_LAKE_ID", "INVALID LAKE ID")
-SERVICE_ACC = os.environ.get("GCP_DATAPLEX_SERVICE_ACC", "XYZ@developer.gserviceaccount.com")
-BUCKET = os.environ.get("GCP_DATAPLEX_BUCKET", "INVALID BUCKET NAME")
-SPARK_FILE_NAME = os.environ.get("SPARK_FILE_NAME", "INVALID FILE NAME")
-SPARK_FILE_FULL_PATH = f"gs://{BUCKET}/{SPARK_FILE_NAME}"
-DATAPLEX_TASK_ID = "task001"
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "example_dataplex"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+SPARK_FILE_NAME = "spark_example_pi.py"
+CURRENT_FOLDER = Path(__file__).parent
+FILE_LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources" / SPARK_FILE_NAME)
+
+LAKE_ID = f"test-lake-{ENV_ID}"
+REGION = "us-central1"
+
+SERVICE_ACC = os.environ.get("GCP_DATAPLEX_SERVICE_ACC")
+
+SPARK_FILE_FULL_PATH = f"gs://{BUCKET_NAME}/{SPARK_FILE_NAME}"
+DATAPLEX_TASK_ID = f"test-task-{ENV_ID}"
TRIGGER_SPEC_TYPE = "ON_DEMAND"
# [START howto_dataplex_configuration]
@@ -50,10 +66,39 @@ EXAMPLE_TASK_BODY = {
}
# [END howto_dataplex_configuration]
+# [START howto_dataplex_lake_configuration]
+EXAMPLE_LAKE_BODY = {
+ "display_name": "test_display_name",
+ "labels": [],
+ "description": "test_description",
+ "metastore": {"service": ""},
+}
+# [END howto_dataplex_lake_configuration]
+
+
with models.DAG(
- "example_dataplex",
+ DAG_ID,
start_date=datetime.datetime(2021, 1, 1),
+ schedule="@once",
+ tags=["example", "dataplex"],
) as dag:
+
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+ )
+
+ upload_file = LocalFilesystemToGCSOperator(
+ task_id="upload_file",
+ src=FILE_LOCAL_PATH,
+ dst=SPARK_FILE_NAME,
+ bucket=BUCKET_NAME,
+ )
+ # [START howto_dataplex_create_lake_operator]
+ create_lake = DataplexCreateLakeOperator(
+ project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
+ )
+ # [END howto_dataplex_create_lake_operator]
+
# [START howto_dataplex_create_task_operator]
create_dataplex_task = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
@@ -71,19 +116,19 @@ with models.DAG(
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
- dataplex_task_id=DATAPLEX_TASK_ID,
+ dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
asynchronous=True,
task_id="create_dataplex_task_async",
)
# [END howto_dataplex_async_create_task_operator]
# [START howto_dataplex_delete_task_operator]
- delete_dataplex_task = DataplexDeleteTaskOperator(
+ delete_dataplex_task_async = DataplexDeleteTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
- dataplex_task_id=DATAPLEX_TASK_ID,
- task_id="delete_dataplex_task",
+ dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
+ task_id="delete_dataplex_task_async",
)
# [END howto_dataplex_delete_task_operator]
@@ -113,11 +158,57 @@ with models.DAG(
)
# [END howto_dataplex_task_state_sensor]
+ # [START howto_dataplex_delete_task_operator]
+ delete_dataplex_task = DataplexDeleteTaskOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ task_id="delete_dataplex_task",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END howto_dataplex_delete_task_operator]
+
+ # [START howto_dataplex_delete_lake_operator]
+ delete_lake = DataplexDeleteLakeOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ task_id="delete_lake",
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+ # [END howto_dataplex_delete_lake_operator]
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
+ )
+
chain(
+ # TEST SETUP
+ create_bucket,
+ upload_file,
+ # TEST BODY
+ create_lake,
create_dataplex_task,
get_dataplex_task,
list_dataplex_task,
- delete_dataplex_task,
create_dataplex_task_async,
+ delete_dataplex_task_async,
dataplex_task_state,
+ # TEST TEARDOWN
+ delete_dataplex_task,
+ delete_lake,
+ delete_bucket,
)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/google/cloud/dataplex/resources/__init__.py b/tests/system/providers/google/cloud/dataplex/resources/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataplex/resources/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/system/providers/google/cloud/dataplex/resources/spark_example_pi.py b/tests/system/providers/google/cloud/dataplex/resources/spark_example_pi.py
new file mode 100644
index 0000000000..77a7cb9e6c
--- /dev/null
+++ b/tests/system/providers/google/cloud/dataplex/resources/spark_example_pi.py
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import sys
+from operator import add
+from random import random
+
+from pyspark.sql import SparkSession
+
+if __name__ == "__main__":
+ """
+ Usage: pi [partitions]
+ """
+ spark = SparkSession.builder.appName("PythonPi").getOrCreate()
+
+ partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
+ n = 100000 * partitions
+
+ def f(_: int) -> float:
+ x = random() * 2 - 1
+ y = random() * 2 - 1
+ return 1 if x**2 + y**2 <= 1 else 0
+
+ count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+ print(f"Pi is roughly {4.0 * count / n:f}")
+
+ spark.stop()