You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/03 11:36:12 UTC
[airflow] branch main updated: Workflows assets & system tests migration (AIP-47) (#24105)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e13b15946e Workflows assets & system tests migration (AIP-47) (#24105)
e13b15946e is described below
commit e13b15946ee2db956040f81ca374cb4619d07cf1
Author: Wojciech Januszek <wj...@sigma.ug.edu.pl>
AuthorDate: Fri Jun 3 13:36:05 2022 +0200
Workflows assets & system tests migration (AIP-47) (#24105)
* Workflows assets & system tests migration (AIP-47)
Co-authored-by: Wojciech Januszek <ja...@google.com>
---
airflow/providers/google/cloud/links/workflows.py | 103 +++++++++++++++++++++
.../providers/google/cloud/operators/workflows.py | 85 +++++++++++++++++
.../providers/google/cloud/sensors/workflows.py | 2 +-
airflow/providers/google/provider.yaml | 3 +
.../operators/cloud/workflows.rst | 22 ++---
.../google/cloud/operators/test_workflows.py | 35 +++++--
.../cloud/operators/test_workflows_system.py | 35 -------
.../google/workflows}/example_workflows.py | 57 +++++++++---
8 files changed, 272 insertions(+), 70 deletions(-)
diff --git a/airflow/providers/google/cloud/links/workflows.py b/airflow/providers/google/cloud/links/workflows.py
new file mode 100644
index 0000000000..db8022c130
--- /dev/null
+++ b/airflow/providers/google/cloud/links/workflows.py
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Workflows links."""
+from typing import TYPE_CHECKING, Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+WORKFLOWS_BASE_LINK = "https://console.cloud.google.com/workflows"
+WORKFLOW_LINK = WORKFLOWS_BASE_LINK + "/workflow/{location_id}/{workflow_id}/executions?project={project_id}"
+WORKFLOWS_LINK = WORKFLOWS_BASE_LINK + "?project={project_id}"
+EXECUTION_LINK = (
+ WORKFLOWS_BASE_LINK
+ + "/workflow/{location_id}/{workflow_id}/execution/{execution_id}?project={project_id}"
+)
+
+
+class WorkflowsWorkflowDetailsLink(BaseGoogleLink):
+ """Helper class for constructing Workflow details Link"""
+
+ name = "Workflow details"
+ key = "workflow_details"
+ format_str = WORKFLOW_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance: BaseOperator,
+ location_id: str,
+ workflow_id: str,
+ project_id: Optional[str],
+ ):
+ task_instance.xcom_push(
+ context,
+ key=WorkflowsWorkflowDetailsLink.key,
+ value={"location_id": location_id, "workflow_id": workflow_id, "project_id": project_id},
+ )
+
+
+class WorkflowsListOfWorkflowsLink(BaseGoogleLink):
+ """Helper class for constructing list of Workflows Link"""
+
+ name = "List of workflows"
+ key = "list_of_workflows"
+ format_str = WORKFLOWS_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance: BaseOperator,
+ project_id: Optional[str],
+ ):
+ task_instance.xcom_push(
+ context,
+ key=WorkflowsListOfWorkflowsLink.key,
+ value={"project_id": project_id},
+ )
+
+
+class WorkflowsExecutionLink(BaseGoogleLink):
+ """Helper class for constructing Workflows Execution Link"""
+
+ name = "Workflow Execution"
+ key = "workflow_execution"
+ format_str = EXECUTION_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance: BaseOperator,
+ location_id: str,
+ workflow_id: str,
+ execution_id: str,
+ project_id: Optional[str],
+ ):
+ task_instance.xcom_push(
+ context,
+ key=WorkflowsExecutionLink.key,
+ value={
+ "location_id": location_id,
+ "workflow_id": workflow_id,
+ "execution_id": execution_id,
+ "project_id": project_id,
+ },
+ )
diff --git a/airflow/providers/google/cloud/operators/workflows.py b/airflow/providers/google/cloud/operators/workflows.py
index 157d34a057..e1cb45659a 100644
--- a/airflow/providers/google/cloud/operators/workflows.py
+++ b/airflow/providers/google/cloud/operators/workflows.py
@@ -31,6 +31,11 @@ from google.protobuf.field_mask_pb2 import FieldMask
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
+from airflow.providers.google.cloud.links.workflows import (
+ WorkflowsExecutionLink,
+ WorkflowsListOfWorkflowsLink,
+ WorkflowsWorkflowDetailsLink,
+)
if TYPE_CHECKING:
from airflow.utils.context import Context
@@ -60,6 +65,7 @@ class WorkflowsCreateWorkflowOperator(BaseOperator):
template_fields: Sequence[str] = ("location", "workflow", "workflow_id")
template_fields_renderers = {"workflow": "json"}
+ operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
def __init__(
self,
@@ -132,6 +138,15 @@ class WorkflowsCreateWorkflowOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+
+ WorkflowsWorkflowDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ location_id=self.location,
+ workflow_id=self.workflow_id,
+ project_id=self.project_id or hook.project_id,
+ )
+
return Workflow.to_dict(workflow)
@@ -162,6 +177,7 @@ class WorkflowsUpdateWorkflowOperator(BaseOperator):
template_fields: Sequence[str] = ("workflow_id", "update_mask")
template_fields_renderers = {"update_mask": "json"}
+ operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
def __init__(
self,
@@ -209,6 +225,15 @@ class WorkflowsUpdateWorkflowOperator(BaseOperator):
metadata=self.metadata,
)
workflow = operation.result()
+
+ WorkflowsWorkflowDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ location_id=self.location,
+ workflow_id=self.workflow_id,
+ project_id=self.project_id or hook.project_id,
+ )
+
return Workflow.to_dict(workflow)
@@ -296,6 +321,7 @@ class WorkflowsListWorkflowsOperator(BaseOperator):
"""
template_fields: Sequence[str] = ("location", "order_by", "filter_")
+ operator_extra_links = (WorkflowsListOfWorkflowsLink(),)
def __init__(
self,
@@ -335,6 +361,13 @@ class WorkflowsListWorkflowsOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+
+ WorkflowsListOfWorkflowsLink.persist(
+ context=context,
+ task_instance=self,
+ project_id=self.project_id or hook.project_id,
+ )
+
return [Workflow.to_dict(w) for w in workflows_iter]
@@ -357,6 +390,7 @@ class WorkflowsGetWorkflowOperator(BaseOperator):
"""
template_fields: Sequence[str] = ("location", "workflow_id")
+ operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
def __init__(
self,
@@ -393,6 +427,15 @@ class WorkflowsGetWorkflowOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+
+ WorkflowsWorkflowDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ location_id=self.location,
+ workflow_id=self.workflow_id,
+ project_id=self.project_id or hook.project_id,
+ )
+
return Workflow.to_dict(workflow)
@@ -418,6 +461,7 @@ class WorkflowsCreateExecutionOperator(BaseOperator):
template_fields: Sequence[str] = ("location", "workflow_id", "execution")
template_fields_renderers = {"execution": "json"}
+ operator_extra_links = (WorkflowsExecutionLink(),)
def __init__(
self,
@@ -459,6 +503,16 @@ class WorkflowsCreateExecutionOperator(BaseOperator):
)
execution_id = execution.name.split("/")[-1]
self.xcom_push(context, key="execution_id", value=execution_id)
+
+ WorkflowsExecutionLink.persist(
+ context=context,
+ task_instance=self,
+ location_id=self.location,
+ workflow_id=self.workflow_id,
+ execution_id=execution_id,
+ project_id=self.project_id or hook.project_id,
+ )
+
return Execution.to_dict(execution)
@@ -482,6 +536,7 @@ class WorkflowsCancelExecutionOperator(BaseOperator):
"""
template_fields: Sequence[str] = ("location", "workflow_id", "execution_id")
+ operator_extra_links = (WorkflowsExecutionLink(),)
def __init__(
self,
@@ -521,6 +576,16 @@ class WorkflowsCancelExecutionOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+
+ WorkflowsExecutionLink.persist(
+ context=context,
+ task_instance=self,
+ location_id=self.location,
+ workflow_id=self.workflow_id,
+ execution_id=self.execution_id,
+ project_id=self.project_id or hook.project_id,
+ )
+
return Execution.to_dict(execution)
@@ -549,6 +614,7 @@ class WorkflowsListExecutionsOperator(BaseOperator):
"""
template_fields: Sequence[str] = ("location", "workflow_id")
+ operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
def __init__(
self,
@@ -588,6 +654,14 @@ class WorkflowsListExecutionsOperator(BaseOperator):
metadata=self.metadata,
)
+ WorkflowsWorkflowDetailsLink.persist(
+ context=context,
+ task_instance=self,
+ location_id=self.location,
+ workflow_id=self.workflow_id,
+ project_id=self.project_id or hook.project_id,
+ )
+
return [Execution.to_dict(e) for e in execution_iter if e.start_time > self.start_date_filter]
@@ -611,6 +685,7 @@ class WorkflowsGetExecutionOperator(BaseOperator):
"""
template_fields: Sequence[str] = ("location", "workflow_id", "execution_id")
+ operator_extra_links = (WorkflowsExecutionLink(),)
def __init__(
self,
@@ -650,4 +725,14 @@ class WorkflowsGetExecutionOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+
+ WorkflowsExecutionLink.persist(
+ context=context,
+ task_instance=self,
+ location_id=self.location,
+ workflow_id=self.workflow_id,
+ execution_id=self.execution_id,
+ project_id=self.project_id or hook.project_id,
+ )
+
return Execution.to_dict(execution)
diff --git a/airflow/providers/google/cloud/sensors/workflows.py b/airflow/providers/google/cloud/sensors/workflows.py
index 8560fd3ee2..58fa872a0d 100644
--- a/airflow/providers/google/cloud/sensors/workflows.py
+++ b/airflow/providers/google/cloud/sensors/workflows.py
@@ -56,7 +56,7 @@ class WorkflowExecutionSensor(BaseSensorOperator):
workflow_id: str,
execution_id: str,
location: str,
- project_id: str,
+ project_id: Optional[str] = None,
success_states: Optional[Set[Execution.State]] = None,
failure_states: Optional[Set[Execution.State]] = None,
retry: Union[Retry, _MethodDefault] = DEFAULT,
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index e61f2dfc3c..e693935145 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -908,6 +908,9 @@ extra-links:
- airflow.providers.google.cloud.links.vertex_ai.VertexAIBatchPredictionJobListLink
- airflow.providers.google.cloud.links.vertex_ai.VertexAIEndpointLink
- airflow.providers.google.cloud.links.vertex_ai.VertexAIEndpointListLink
+ - airflow.providers.google.cloud.links.workflows.WorkflowsWorkflowDetailsLink
+ - airflow.providers.google.cloud.links.workflows.WorkflowsListOfWorkflowsLink
+ - airflow.providers.google.cloud.links.workflows.WorkflowsExecutionLink
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentLink
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
- airflow.providers.google.cloud.links.dataflow.DataflowJobLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/workflows.rst b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst
index 09685491f2..0cea43f0f7 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/workflows.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/workflows.rst
@@ -39,7 +39,7 @@ Create workflow
To create a workflow use
:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateWorkflowOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 4
:start-after: [START how_to_create_workflow]
@@ -47,7 +47,7 @@ To create a workflow use
The workflow should be define in similar why to this example:
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 0
:start-after: [START how_to_define_workflow]
@@ -65,7 +65,7 @@ Update workflow
To update a workflow use
:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsUpdateWorkflowOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 4
:start-after: [START how_to_update_workflow]
@@ -79,7 +79,7 @@ Get workflow
To get a workflow use
:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetWorkflowOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 4
:start-after: [START how_to_get_workflow]
@@ -93,7 +93,7 @@ List workflows
To list workflows use
:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListWorkflowsOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 4
:start-after: [START how_to_list_workflows]
@@ -107,7 +107,7 @@ Delete workflow
To delete a workflow use
:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsDeleteWorkflowOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 4
:start-after: [START how_to_delete_workflow]
@@ -122,7 +122,7 @@ To create an execution use
:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCreateExecutionOperator`.
This operator is not idempotent due to API limitation.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 4
:start-after: [START how_to_create_execution]
@@ -131,7 +131,7 @@ This operator is not idempotent due to API limitation.
The create operator does not wait for execution to complete. To wait for execution result use
:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowExecutionSensor`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 4
:start-after: [START how_to_wait_for_execution]
@@ -145,7 +145,7 @@ Get execution
To get an execution use
:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsGetExecutionOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 4
:start-after: [START how_to_get_execution]
@@ -160,7 +160,7 @@ To list executions use
:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsListExecutionsOperator`.
By default this operator will return only executions for last 60 minutes.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 4
:start-after: [START how_to_list_executions]
@@ -174,7 +174,7 @@ Cancel execution
To cancel an execution use
:class:`~airflow.providers.google.cloud.operators.dataproc.WorkflowsCancelExecutionOperator`.
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_workflows.py
+.. exampleinclude:: /../../tests/system/providers/google/workflows/example_workflows.py
:language: python
:dedent: 4
:start-after: [START how_to_cancel_execution]
diff --git a/tests/providers/google/cloud/operators/test_workflows.py b/tests/providers/google/cloud/operators/test_workflows.py
index 5578548ffb..0ba4a64298 100644
--- a/tests/providers/google/cloud/operators/test_workflows.py
+++ b/tests/providers/google/cloud/operators/test_workflows.py
@@ -64,7 +64,8 @@ class TestWorkflowsCreateWorkflowOperator:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- result = op.execute({})
+ context = mock.MagicMock()
+ result = op.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -100,7 +101,8 @@ class TestWorkflowsUpdateWorkflowOperator:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- result = op.execute({})
+ context = mock.MagicMock()
+ result = op.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -181,7 +183,8 @@ class TestWorkflowsListWorkflowsOperator:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- result = op.execute({})
+ context = mock.MagicMock()
+ result = op.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -216,7 +219,8 @@ class TestWorkflowsGetWorkflowOperator:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- result = op.execute({})
+ context = mock.MagicMock()
+ result = op.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -253,7 +257,8 @@ class TestWorkflowExecutionsCreateExecutionOperator:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- result = op.execute({})
+ context = mock.MagicMock()
+ result = op.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -269,7 +274,16 @@ class TestWorkflowExecutionsCreateExecutionOperator:
timeout=TIMEOUT,
metadata=METADATA,
)
- mock_xcom.assert_called_once_with({}, key="execution_id", value="execution_id")
+ mock_xcom.assert_called_with(
+ context,
+ key="workflow_execution",
+ value={
+ 'location_id': LOCATION,
+ 'workflow_id': WORKFLOW_ID,
+ 'execution_id': EXECUTION_ID,
+ 'project_id': PROJECT_ID,
+ },
+ )
assert result == mock_object.to_dict.return_value
@@ -289,7 +303,8 @@ class TestWorkflowExecutionsCancelExecutionOperator:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- result = op.execute({})
+ context = mock.MagicMock()
+ result = op.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -328,7 +343,8 @@ class TestWorkflowExecutionsListExecutionsOperator:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- result = op.execute({})
+ context = mock.MagicMock()
+ result = op.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
@@ -363,7 +379,8 @@ class TestWorkflowExecutionsGetExecutionOperator:
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
)
- result = op.execute({})
+ context = mock.MagicMock()
+ result = op.execute(context=context)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID,
diff --git a/tests/providers/google/cloud/operators/test_workflows_system.py b/tests/providers/google/cloud/operators/test_workflows_system.py
deleted file mode 100644
index 300552b048..0000000000
--- a/tests/providers/google/cloud/operators/test_workflows_system.py
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-import pytest
-
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_WORKFLOWS_KEY
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-
-@pytest.mark.system("google.cloud")
-@pytest.mark.credential_file(GCP_WORKFLOWS_KEY)
-class WorkflowsExampleDagsSystemTest(GoogleSystemTest):
- def setUp(self):
- super().setUp()
-
- @provide_gcp_context(GCP_WORKFLOWS_KEY)
- def test_run_example_workflow_dag(self):
- self.run_dag('example_cloud_workflows', CLOUD_DAG_FOLDER)
-
- def tearDown(self):
- super().tearDown()
diff --git a/airflow/providers/google/cloud/example_dags/example_workflows.py b/tests/system/providers/google/workflows/example_workflows.py
similarity index 78%
rename from airflow/providers/google/cloud/example_dags/example_workflows.py
rename to tests/system/providers/google/workflows/example_workflows.py
index e2ca88cdf3..47ca818bf8 100644
--- a/airflow/providers/google/cloud/example_dags/example_workflows.py
+++ b/tests/system/providers/google/workflows/example_workflows.py
@@ -33,11 +33,15 @@ from airflow.providers.google.cloud.operators.workflows import (
WorkflowsUpdateWorkflowOperator,
)
from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor
+from airflow.utils.trigger_rule import TriggerRule
-LOCATION = os.environ.get("GCP_WORKFLOWS_LOCATION", "us-central1")
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
-WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_WORKFLOW_ID", "airflow-test-workflow")
+DAG_ID = "cloud_workflows"
+
+LOCATION = "us-central1"
+WORKFLOW_ID = f"workflow-{DAG_ID}-{ENV_ID}"
# [START how_to_define_workflow]
WORKFLOW_CONTENT = """
@@ -67,7 +71,7 @@ WORKFLOW = {
EXECUTION = {"argument": ""}
-SLEEP_WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_SLEEP_WORKFLOW_ID", "sleep_workflow")
+SLEEP_WORKFLOW_ID = f"sleep-workflow-{DAG_ID}-{ENV_ID}"
SLEEP_WORKFLOW_CONTENT = """
- someSleep:
call: sys.sleep
@@ -83,7 +87,7 @@ SLEEP_WORKFLOW = {
with DAG(
- "example_cloud_workflows",
+ DAG_ID,
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
@@ -99,8 +103,8 @@ with DAG(
# [END how_to_create_workflow]
# [START how_to_update_workflow]
- update_workflows = WorkflowsUpdateWorkflowOperator(
- task_id="update_workflows",
+ update_workflow = WorkflowsUpdateWorkflowOperator(
+ task_id="update_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
@@ -127,6 +131,7 @@ with DAG(
task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
# [END how_to_delete_workflow]
+ delete_workflow.trigger_rule = TriggerRule.ALL_DONE
# [START how_to_create_execution]
create_execution = WorkflowsCreateExecutionOperator(
@@ -182,21 +187,36 @@ with DAG(
workflow_id=SLEEP_WORKFLOW_ID,
)
+ cancel_execution_id = create_execution_for_cancel.output["execution_id"]
+
# [START how_to_cancel_execution]
cancel_execution = WorkflowsCancelExecutionOperator(
task_id="cancel_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=SLEEP_WORKFLOW_ID,
- execution_id=create_execution_id,
+ execution_id=cancel_execution_id,
)
# [END how_to_cancel_execution]
- create_workflow >> update_workflows >> [get_workflow, list_workflows]
- update_workflows >> [create_execution, create_execution_for_cancel]
+ delete_workflow_for_cancel = WorkflowsDeleteWorkflowOperator(
+ task_id="delete_workflow_for_cancel",
+ location=LOCATION,
+ project_id=PROJECT_ID,
+ workflow_id=SLEEP_WORKFLOW_ID,
+ trigger_rule=TriggerRule.ALL_DONE,
+ )
+
+ create_workflow >> update_workflow >> [get_workflow, list_workflows]
+ update_workflow >> [create_execution, create_execution_for_cancel]
wait_for_execution >> [get_execution, list_executions]
- create_workflow_for_cancel >> create_execution_for_cancel >> cancel_execution
+ (
+ create_workflow_for_cancel
+ >> create_execution_for_cancel
+ >> cancel_execution
+ >> delete_workflow_for_cancel
+ )
[cancel_execution, list_executions] >> delete_workflow
@@ -205,7 +225,16 @@ with DAG(
# create_execution >> get_execution
# create_execution >> cancel_execution
+ # ### Everything below this line is not part of example ###
+ # ### Just for system tests purpose ###
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
-if __name__ == '__main__':
- dag.clear(dag_run_state=None)
- dag.run()
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)