You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/03 11:36:12 UTC

[airflow] branch main updated: Workflows assets & system tests migration (AIP-47) (#24105)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e13b15946e Workflows assets & system tests migration (AIP-47) (#24105)
e13b15946e is described below

commit e13b15946ee2db956040f81ca374cb4619d07cf1
Author: Wojciech Januszek <wj...@sigma.ug.edu.pl>
AuthorDate: Fri Jun 3 13:36:05 2022 +0200

    Workflows assets & system tests migration (AIP-47) (#24105)
    
    * Workflows assets & system tests migration (AIP-47)
    
    Co-authored-by: Wojciech Januszek <ja...@google.com>
---
 airflow/providers/google/cloud/links/workflows.py  | 103 +++++++++++++++++++++
 .../providers/google/cloud/operators/workflows.py  |  85 +++++++++++++++++
 .../providers/google/cloud/sensors/workflows.py    |   2 +-
 airflow/providers/google/provider.yaml             |   3 +
 .../operators/cloud/workflows.rst                  |  22 ++---
 .../google/cloud/operators/test_workflows.py       |  35 +++++--
 .../cloud/operators/test_workflows_system.py       |  35 -------
 .../google/workflows}/example_workflows.py         |  57 +++++++++---
 8 files changed, 272 insertions(+), 70 deletions(-)

diff --git a/airflow/providers/google/cloud/links/workflows.py b/airflow/providers/google/cloud/links/workflows.py
new file mode 100644
index 0000000000..db8022c130
--- /dev/null
+++ b/airflow/providers/google/cloud/links/workflows.py
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Workflows links."""
+from typing import TYPE_CHECKING, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+WORKFLOWS_BASE_LINK = "https://console.cloud.google.com/workflows"
+WORKFLOW_LINK = WORKFLOWS_BASE_LINK + "/workflow/{location_id}/{workflow_id}/executions?project={project_id}"
+WORKFLOWS_LINK = WORKFLOWS_BASE_LINK + "?project={project_id}"
+EXECUTION_LINK = (
+    WORKFLOWS_BASE_LINK
+    + "/workflow/{location_id}/{workflow_id}/execution/{execution_id}?project={project_id}"
+)
+
+
+class WorkflowsWorkflowDetailsLink(BaseGoogleLink):
+    """Helper class for constructing Workflow details Link"""
+
+    name = "Workflow details"
+    key = "workflow_details"
+    format_str = WORKFLOW_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        location_id: str,
+        workflow_id: str,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=WorkflowsWorkflowDetailsLink.key,
+            value={"location_id": location_id, "workflow_id": workflow_id, "project_id": project_id},
+        )
+
+
+class WorkflowsListOfWorkflowsLink(BaseGoogleLink):
+    """Helper class for constructing list of Workflows Link"""
+
+    name = "List of workflows"
+    key = "list_of_workflows"
+    format_str = WORKFLOWS_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=WorkflowsListOfWorkflowsLink.key,
+            value={"project_id": project_id},
+        )
+
+
+class WorkflowsExecutionLink(BaseGoogleLink):
+    """Helper class for constructing Workflows Execution Link"""
+
+    name = "Workflow Execution"
+    key = "workflow_execution"
+    format_str = EXECUTION_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance: BaseOperator,
+        location_id: str,
+        workflow_id: str,
+        execution_id: str,
+        project_id: Optional[str],
+    ):
+        task_instance.xcom_push(
+            context,
+            key=WorkflowsExecutionLink.key,
+            value={
+                "location_id": location_id,
+                "workflow_id": workflow_id,
+                "execution_id": execution_id,
+                "project_id": project_id,
+            },
+        )
diff --git a/airflow/providers/google/cloud/operators/workflows.py b/airflow/providers/google/cloud/operators/workflows.py
index 157d34a057..e1cb45659a 100644
--- a/airflow/providers/google/cloud/operators/workflows.py
+++ b/airflow/providers/google/cloud/operators/workflows.py
@@ -31,6 +31,11 @@ from google.protobuf.field_mask_pb2 import FieldMask
 
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
+from airflow.providers.google.cloud.links.workflows import (
+    WorkflowsExecutionLink,
+    WorkflowsListOfWorkflowsLink,
+    WorkflowsWorkflowDetailsLink,
+)
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -60,6 +65,7 @@ class WorkflowsCreateWorkflowOperator(BaseOperator):
 
     template_fields: Sequence[str] = ("location", "workflow", "workflow_id")
     template_fields_renderers = {"workflow": "json"}
+    operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
 
     def __init__(
         self,
@@ -132,6 +138,15 @@ class WorkflowsCreateWorkflowOperator(BaseOperator):
                 timeout=self.timeout,
                 metadata=self.metadata,
             )
+
+        WorkflowsWorkflowDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            location_id=self.location,
+            workflow_id=self.workflow_id,
+            project_id=self.project_id or hook.project_id,
+        )
+
         return Workflow.to_dict(workflow)
 
 
@@ -162,6 +177,7 @@ class WorkflowsUpdateWorkflowOperator(BaseOperator):
 
     template_fields: Sequence[str] = ("workflow_id", "update_mask")
     template_fields_renderers = {"update_mask": "json"}
+    operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
 
     def __init__(
         self,
@@ -209,6 +225,15 @@ class WorkflowsUpdateWorkflowOperator(BaseOperator):
             metadata=self.metadata,
         )
         workflow = operation.result()
+
+        WorkflowsWorkflowDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            location_id=self.location,
+            workflow_id=self.workflow_id,
+            project_id=self.project_id or hook.project_id,
+        )
+
         return Workflow.to_dict(workflow)
 
 
@@ -296,6 +321,7 @@ class WorkflowsListWorkflowsOperator(BaseOperator):
     """
 
     template_fields: Sequence[str] = ("location", "order_by", "filter_")
+    operator_extra_links = (WorkflowsListOfWorkflowsLink(),)
 
     def __init__(
         self,
@@ -335,6 +361,13 @@ class WorkflowsListWorkflowsOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+
+        WorkflowsListOfWorkflowsLink.persist(
+            context=context,
+            task_instance=self,
+            project_id=self.project_id or hook.project_id,
+        )
+
         return [Workflow.to_dict(w) for w in workflows_iter]
 
 
@@ -357,6 +390,7 @@ class WorkflowsGetWorkflowOperator(BaseOperator):
     """
 
     template_fields: Sequence[str] = ("location", "workflow_id")
+    operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
 
     def __init__(
         self,
@@ -393,6 +427,15 @@ class WorkflowsGetWorkflowOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+
+        WorkflowsWorkflowDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            location_id=self.location,
+            workflow_id=self.workflow_id,
+            project_id=self.project_id or hook.project_id,
+        )
+
         return Workflow.to_dict(workflow)
 
 
@@ -418,6 +461,7 @@ class WorkflowsCreateExecutionOperator(BaseOperator):
 
     template_fields: Sequence[str] = ("location", "workflow_id", "execution")
     template_fields_renderers = {"execution": "json"}
+    operator_extra_links = (WorkflowsExecutionLink(),)
 
     def __init__(
         self,
@@ -459,6 +503,16 @@ class WorkflowsCreateExecutionOperator(BaseOperator):
         )
         execution_id = execution.name.split("/")[-1]
         self.xcom_push(context, key="execution_id", value=execution_id)
+
+        WorkflowsExecutionLink.persist(
+            context=context,
+            task_instance=self,
+            location_id=self.location,
+            workflow_id=self.workflow_id,
+            execution_id=execution_id,
+            project_id=self.project_id or hook.project_id,
+        )
+
         return Execution.to_dict(execution)
 
 
@@ -482,6 +536,7 @@ class WorkflowsCancelExecutionOperator(BaseOperator):
     """
 
     template_fields: Sequence[str] = ("location", "workflow_id", "execution_id")
+    operator_extra_links = (WorkflowsExecutionLink(),)
 
     def __init__(
         self,
@@ -521,6 +576,16 @@ class WorkflowsCancelExecutionOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+
+        WorkflowsExecutionLink.persist(
+            context=context,
+            task_instance=self,
+            location_id=self.location,
+            workflow_id=self.workflow_id,
+            execution_id=self.execution_id,
+            project_id=self.project_id or hook.project_id,
+        )
+
         return Execution.to_dict(execution)
 
 
@@ -549,6 +614,7 @@ class WorkflowsListExecutionsOperator(BaseOperator):
     """
 
     template_fields: Sequence[str] = ("location", "workflow_id")
+    operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
 
     def __init__(
         self,
@@ -588,6 +654,14 @@ class WorkflowsListExecutionsOperator(BaseOperator):
             metadata=self.metadata,
         )
 
+        WorkflowsWorkflowDetailsLink.persist(
+            context=context,
+            task_instance=self,
+            location_id=self.location,
+            workflow_id=self.workflow_id,
+            project_id=self.project_id or hook.project_id,
+        )
+
         return [Execution.to_dict(e) for e in execution_iter if e.start_time > self.start_date_filter]
 
 
@@ -611,6 +685,7 @@ class WorkflowsGetExecutionOperator(BaseOperator):
     """
 
     template_fields: Sequence[str] = ("location", "workflow_id", "execution_id")
+    operator_extra_links = (WorkflowsExecutionLink(),)
 
     def __init__(
         self,
@@ -650,4 +725,14 @@ class WorkflowsGetExecutionOperator(BaseOperator):
             timeout=self.timeout,
             metadata=self.metadata,
         )
+
+        WorkflowsExecutionLink.persist(
+            context=context,
+            task_instance=self,
+            location_id=self.location,
+            workflow_id=self.workflow_id,
+            execution_id=self.execution_id,
+            project_id=self.project_id or hook.project_id,
+        )
+
         return Execution.to_dict(execution)
diff --git a/airflow/providers/google/cloud/sensors/workflows.py b/airflow/providers/google/cloud/sensors/workflows.py
index 8560fd3ee2..58fa872a0d 100644
--- a/airflow/providers/google/cloud/sensors/workflows.py
+++ b/airflow/providers/google/cloud/sensors/workflows.py
@@ -56,7 +56,7 @@ class WorkflowExecutionSensor(BaseSensorOperator):
         workflow_id: str,
         execution_id: str,
         location: str,
-        project_id: str,
+        project_id: Optional[str] = None,
         success_states: Optional[Set[Execution.State]] = None,
         failure_states: Optional[Set[Execution.State]] = None,
         retry: Union[Retry, _MethodDefault] = DEFAULT,
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index e61f2dfc3c..e693935145 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -908,6 +908,9 @@ extra-links:
   - airflow.providers.google.cloud.links.vertex_ai.VertexAIBatchPredictionJobListLink
   - airflow.providers.google.cloud.links.vertex_ai.VertexAIEndpointLink
   - airflow.providers.google.cloud.links.vertex_ai.VertexAIEndpointListLink
+  - airflow.providers.google.cloud.links.workflows.WorkflowsWorkflowDetailsLink
+  - airflow.providers.google.cloud.links.workflows.WorkflowsListOfWorkflowsLink
+  - airflow.providers.google.cloud.links.workflows.WorkflowsExecutionLink
   - airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentLink
   - airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
   - airflow.providers.google.cloud.links.dataflow.DataflowJobLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/workflows.rst b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst
index 09685491f2..0cea43f0f7 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/workflows.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst
@@ -39,7 +39,7 @@ Create workflow
 To create a workflow use
 :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateWorkflowOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 4
       :start-after: [START how_to_create_workflow]
@@ -47,7 +47,7 @@ To create a workflow use
 
 The workflow should be define in similar why to this example:
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 0
       :start-after: [START how_to_define_workflow]
@@ -65,7 +65,7 @@ Update workflow
 To update a workflow use
 :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsUpdateWorkflowOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 4
       :start-after: [START how_to_update_workflow]
@@ -79,7 +79,7 @@ Get workflow
 To get a workflow use
 :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetWorkflowOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 4
       :start-after: [START how_to_get_workflow]
@@ -93,7 +93,7 @@ List workflows
 To list workflows use
 :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListWorkflowsOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 4
       :start-after: [START how_to_list_workflows]
@@ -107,7 +107,7 @@ Delete workflow
 To delete a workflow use
 :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsDeleteWorkflowOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 4
       :start-after: [START how_to_delete_workflow]
@@ -122,7 +122,7 @@ To create an execution use
 :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateExecutionOperator`.
 This operator is not idempotent due to API limitation.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 4
       :start-after: [START how_to_create_execution]
@@ -131,7 +131,7 @@ This operator is not idempotent due to API limitation.
 The create operator does not wait for execution to complete. To wait for execution result use
 :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowExecutionSensor`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 4
       :start-after: [START how_to_wait_for_execution]
@@ -145,7 +145,7 @@ Get execution
 To get an execution use
 :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetExecutionOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 4
       :start-after: [START how_to_get_execution]
@@ -160,7 +160,7 @@ To list executions use
 :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListExecutionsOperator`.
 By default this operator will return only executions for last 60 minutes.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 4
       :start-after: [START how_to_list_executions]
@@ -174,7 +174,7 @@ Cancel execution
 To cancel an execution use
 :class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCancelExecutionOperator`.
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
       :language: python
       :dedent: 4
       :start-after: [START how_to_cancel_execution]
diff --git a/tests/providers/google/cloud/operators/test_workflows.py b/tests/providers/google/cloud/operators/test_workflows.py
index 5578548ffb..0ba4a64298 100644
--- a/tests/providers/google/cloud/operators/test_workflows.py
+++ b/tests/providers/google/cloud/operators/test_workflows.py
@@ -64,7 +64,8 @@ class TestWorkflowsCreateWorkflowOperator:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        result = op.execute({})
+        context = mock.MagicMock()
+        result = op.execute(context=context)
 
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -100,7 +101,8 @@ class TestWorkflowsUpdateWorkflowOperator:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        result = op.execute({})
+        context = mock.MagicMock()
+        result = op.execute(context=context)
 
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -181,7 +183,8 @@ class TestWorkflowsListWorkflowsOperator:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        result = op.execute({})
+        context = mock.MagicMock()
+        result = op.execute(context=context)
 
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -216,7 +219,8 @@ class TestWorkflowsGetWorkflowOperator:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        result = op.execute({})
+        context = mock.MagicMock()
+        result = op.execute(context=context)
 
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -253,7 +257,8 @@ class TestWorkflowExecutionsCreateExecutionOperator:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        result = op.execute({})
+        context = mock.MagicMock()
+        result = op.execute(context=context)
 
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -269,7 +274,16 @@ class TestWorkflowExecutionsCreateExecutionOperator:
             timeout=TIMEOUT,
             metadata=METADATA,
         )
-        mock_xcom.assert_called_once_with({}, key="execution_id", value="execution_id")
+        mock_xcom.assert_called_with(
+            context,
+            key="workflow_execution",
+            value={
+                'location_id': LOCATION,
+                'workflow_id': WORKFLOW_ID,
+                'execution_id': EXECUTION_ID,
+                'project_id': PROJECT_ID,
+            },
+        )
         assert result == mock_object.to_dict.return_value
 
 
@@ -289,7 +303,8 @@ class TestWorkflowExecutionsCancelExecutionOperator:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        result = op.execute({})
+        context = mock.MagicMock()
+        result = op.execute(context=context)
 
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -328,7 +343,8 @@ class TestWorkflowExecutionsListExecutionsOperator:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        result = op.execute({})
+        context = mock.MagicMock()
+        result = op.execute(context=context)
 
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
@@ -363,7 +379,8 @@ class TestWorkflowExecutionsGetExecutionOperator:
             gcp_conn_id=GCP_CONN_ID,
             impersonation_chain=IMPERSONATION_CHAIN,
         )
-        result = op.execute({})
+        context = mock.MagicMock()
+        result = op.execute(context=context)
 
         mock_hook.assert_called_once_with(
             gcp_conn_id=GCP_CONN_ID,
diff --git a/tests/providers/google/cloud/operators/test_workflows_system.py b/tests/providers/google/cloud/operators/test_workflows_system.py
deleted file mode 100644
index 300552b048..0000000000
--- a/tests/providers/google/cloud/operators/test_workflows_system.py
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import pytest
-
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_WORKFLOWS_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-
-@pytest.mark.system("google.cloud")
-@pytest.mark.credential_file(GCP_WORKFLOWS_KEY)
-class WorkflowsExampleDagsSystemTest(GoogleSystemTest):
-    def setUp(self):
-        super().setUp()
-
-    @provide_gcp_context(GCP_WORKFLOWS_KEY)
-    def test_run_example_workflow_dag(self):
-        self.run_dag('example_cloud_workflows', CLOUD_DAG_FOLDER)
-
-    def tearDown(self):
-        super().tearDown()
diff --git a/airflow/providers/google/cloud/example_dags/example_workflows.py b/tests/system/providers/google/workflows/example_workflows.py
similarity index 78%
rename from airflow/providers/google/cloud/example_dags/example_workflows.py
rename to tests/system/providers/google/workflows/example_workflows.py
index e2ca88cdf3..47ca818bf8 100644
--- a/airflow/providers/google/cloud/example_dags/example_workflows.py
+++ b/tests/system/providers/google/workflows/example_workflows.py
@@ -33,11 +33,15 @@ from airflow.providers.google.cloud.operators.workflows import (
     WorkflowsUpdateWorkflowOperator,
 )
 from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor
+from airflow.utils.trigger_rule import TriggerRule
 
-LOCATION = os.environ.get("GCP_WORKFLOWS_LOCATION", "us-central1")
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
-WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_WORKFLOW_ID", "airflow-test-workflow")
+DAG_ID = "cloud_workflows"
+
+LOCATION = "us-central1"
+WORKFLOW_ID = f"workflow-{DAG_ID}-{ENV_ID}"
 
 # [START how_to_define_workflow]
 WORKFLOW_CONTENT = """
@@ -67,7 +71,7 @@ WORKFLOW = {
 
 EXECUTION = {"argument": ""}
 
-SLEEP_WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_SLEEP_WORKFLOW_ID", "sleep_workflow")
+SLEEP_WORKFLOW_ID = f"sleep-workflow-{DAG_ID}-{ENV_ID}"
 SLEEP_WORKFLOW_CONTENT = """
 - someSleep:
     call: sys.sleep
@@ -83,7 +87,7 @@ SLEEP_WORKFLOW = {
 
 
 with DAG(
-    "example_cloud_workflows",
+    DAG_ID,
     schedule_interval='@once',
     start_date=datetime(2021, 1, 1),
     catchup=False,
@@ -99,8 +103,8 @@ with DAG(
     # [END how_to_create_workflow]
 
     # [START how_to_update_workflow]
-    update_workflows = WorkflowsUpdateWorkflowOperator(
-        task_id="update_workflows",
+    update_workflow = WorkflowsUpdateWorkflowOperator(
+        task_id="update_workflow",
         location=LOCATION,
         project_id=PROJECT_ID,
         workflow_id=WORKFLOW_ID,
@@ -127,6 +131,7 @@ with DAG(
         task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
     )
     # [END how_to_delete_workflow]
+    delete_workflow.trigger_rule = TriggerRule.ALL_DONE
 
     # [START how_to_create_execution]
     create_execution = WorkflowsCreateExecutionOperator(
@@ -182,21 +187,36 @@ with DAG(
         workflow_id=SLEEP_WORKFLOW_ID,
     )
 
+    cancel_execution_id = create_execution_for_cancel.output["execution_id"]
+
     # [START how_to_cancel_execution]
     cancel_execution = WorkflowsCancelExecutionOperator(
         task_id="cancel_execution",
         location=LOCATION,
         project_id=PROJECT_ID,
         workflow_id=SLEEP_WORKFLOW_ID,
-        execution_id=create_execution_id,
+        execution_id=cancel_execution_id,
     )
     # [END how_to_cancel_execution]
 
-    create_workflow >> update_workflows >> [get_workflow, list_workflows]
-    update_workflows >> [create_execution, create_execution_for_cancel]
+    delete_workflow_for_cancel = WorkflowsDeleteWorkflowOperator(
+        task_id="delete_workflow_for_cancel",
+        location=LOCATION,
+        project_id=PROJECT_ID,
+        workflow_id=SLEEP_WORKFLOW_ID,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    create_workflow >> update_workflow >> [get_workflow, list_workflows]
+    update_workflow >> [create_execution, create_execution_for_cancel]
 
     wait_for_execution >> [get_execution, list_executions]
-    create_workflow_for_cancel >> create_execution_for_cancel >> cancel_execution
+    (
+        create_workflow_for_cancel
+        >> create_execution_for_cancel
+        >> cancel_execution
+        >> delete_workflow_for_cancel
+    )
 
     [cancel_execution, list_executions] >> delete_workflow
 
@@ -205,7 +225,16 @@ with DAG(
     #   create_execution >> get_execution
     #   create_execution >> cancel_execution
 
+    # ### Everything below this line is not part of example ###
+    # ### Just for system tests purpose ###
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
 
-if __name__ == '__main__':
-    dag.clear(dag_run_state=None)
-    dag.run()
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)