You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/31 05:14:18 UTC
[airflow] branch main updated: Migration of System Tests: Cloud Composer (AIP-47) (#27227)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eb8c0cf0d2 Migration of System Tests: Cloud Composer (AIP-47) (#27227)
eb8c0cf0d2 is described below
commit eb8c0cf0d2c657824f666e874ec4e21940931ea4
Author: Beata Kossakowska <10...@users.noreply.github.com>
AuthorDate: Mon Oct 31 06:14:11 2022 +0100
Migration of System Tests: Cloud Composer (AIP-47) (#27227)
---
.../google/cloud/operators/cloud_composer.py | 2 +
.../google/cloud/sensors/cloud_composer.py | 99 ++++++++++++++++++
.../google/cloud/triggers/cloud_composer.py | 3 +-
airflow/providers/google/provider.yaml | 3 +
.../operators/cloud/cloud_composer.rst | 22 ++--
.../cloud/operators/test_cloud_composer_system.py | 46 ---------
.../google/cloud/sensors/test_cloud_composer.py | 73 ++++++++++++++
.../providers/google/cloud/composer/__init__.py | 16 +++
.../cloud/composer}/example_cloud_composer.py | 70 ++++---------
.../composer/example_cloud_composer_deferrable.py | 112 +++++++--------------
10 files changed, 265 insertions(+), 181 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/cloud_composer.py b/airflow/providers/google/cloud/operators/cloud_composer.py
index eaffecaa01..1a245d2eab 100644
--- a/airflow/providers/google/cloud/operators/cloud_composer.py
+++ b/airflow/providers/google/cloud/operators/cloud_composer.py
@@ -180,6 +180,8 @@ class CloudComposerCreateEnvironmentOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
+ context["ti"].xcom_push(key="operation_id", value=result.operation.name)
+
if not self.deferrable:
environment = hook.wait_for_operation(timeout=self.timeout, operation=result)
return Environment.to_dict(environment)
diff --git a/airflow/providers/google/cloud/sensors/cloud_composer.py b/airflow/providers/google/cloud/sensors/cloud_composer.py
new file mode 100644
index 0000000000..ad4ae92331
--- /dev/null
+++ b/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Cloud Composer sensor."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.triggers.cloud_composer import CloudComposerExecutionTrigger
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class CloudComposerEnvironmentSensor(BaseSensorOperator):
+ """
+ Check the status of the Cloud Composer Environment task
+
+ :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+ :param region: Required. The ID of the Google Cloud region that the service belongs to.
+ :param operation_name: The name of the operation resource
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using short-term
+ credentials, or chained list of accounts required to get the access_token
+ of the last account in the list, which will be impersonated in the request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding identity, with first
+ account from the list granting this role to the originating account (templated).
+ :param pooling_period_seconds: Optional: Control the rate of the poll for the result of deferrable run.
+ """
+
+ def __init__(
+ self,
+ *,
+ project_id: str,
+ region: str,
+ operation_name: str,
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: str | None = None,
+ impersonation_chain: str | Sequence[str] | None = None,
+ pooling_period_seconds: int = 30,
+ **kwargs,
+ ):
+ super().__init__(**kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.operation_name = operation_name
+ self.pooling_period_seconds = pooling_period_seconds
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: Context) -> None:
+ """Airflow runs this method on the worker and defers using the trigger."""
+ self.defer(
+ trigger=CloudComposerExecutionTrigger(
+ project_id=self.project_id,
+ region=self.region,
+ operation_name=self.operation_name,
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ delegate_to=self.delegate_to,
+ pooling_period_seconds=self.pooling_period_seconds,
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context: dict[str, Any], event: dict[str, str] | None = None) -> str:
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes execution was
+ successful.
+ """
+ if event:
+ if event.get("operation_done"):
+ return event["operation_done"]
+ raise AirflowException(event["message"])
+ raise AirflowException("No event received in trigger callback")
diff --git a/airflow/providers/google/cloud/triggers/cloud_composer.py b/airflow/providers/google/cloud/triggers/cloud_composer.py
index 9a745e7819..20613b70ab 100644
--- a/airflow/providers/google/cloud/triggers/cloud_composer.py
+++ b/airflow/providers/google/cloud/triggers/cloud_composer.py
@@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
from __future__ import annotations
import asyncio
@@ -42,11 +43,9 @@ class CloudComposerExecutionTrigger(BaseTrigger):
self.project_id = project_id
self.region = region
self.operation_name = operation_name
-
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.delegate_to = delegate_to
-
self.pooling_period_seconds = pooling_period_seconds
self.gcp_hook = CloudComposerAsyncHook(
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 10c47c7d54..3e105f13e1 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -595,6 +595,9 @@ sensors:
- integration-name: Google Bigtable
python-modules:
- airflow.providers.google.cloud.sensors.bigtable
+ - integration-name: Google Cloud Composer
+ python-modules:
+ - airflow.providers.google.cloud.sensors.cloud_composer
- integration-name: Google Cloud Storage Transfer Service
python-modules:
- airflow.providers.google.cloud.sensors.cloud_storage_transfer_service
diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst b/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
index 14a29e9361..ada347def7 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
@@ -39,7 +39,7 @@ For more information about the available fields to pass when creating a environm
A simple environment configuration can look as followed:
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 0
:start-after: [START howto_operator_composer_simple_environment]
@@ -48,7 +48,7 @@ A simple environment configuration can look as followed:
With this configuration we can create the environment:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_composer_environment]
@@ -57,7 +57,7 @@ With this configuration we can create the environment:
or you can define the same operator in the deferrable mode:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_composer_environment_deferrable_mode]
@@ -70,7 +70,7 @@ To get a environment you can use:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerGetEnvironmentOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_operator_get_composer_environment]
@@ -83,7 +83,7 @@ To get a environment you can use:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerListEnvironmentsOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_operator_list_composer_environments]
@@ -98,7 +98,7 @@ For more information on updateMask and other parameters take a look at `Cloud Co
An example of a new service config and the updateMask:
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 0
:start-after: [START howto_operator_composer_update_environment]
@@ -107,7 +107,7 @@ An example of a new service config and the updateMask:
To update a service you can use:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerUpdateEnvironmentOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_operator_update_composer_environment]
@@ -116,7 +116,7 @@ To update a service you can use:
or you can define the same operator in the deferrable mode:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
:language: python
:dedent: 4
:start-after: [START howto_operator_update_composer_environment_deferrable_mode]
@@ -129,7 +129,7 @@ To delete a service you can use:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerDeleteEnvironmentOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_operator_delete_composer_environment]
@@ -138,7 +138,7 @@ To delete a service you can use:
or you can define the same operator in the deferrable mode:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerDeleteEnvironmentOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
:language: python
:dedent: 4
:start-after: [START howto_operator_delete_composer_environment_deferrable_mode]
@@ -152,7 +152,7 @@ You can also list all supported Cloud Composer images:
:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerListImageVersionsOperator`
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_operator_composer_image_list]
diff --git a/tests/providers/google/cloud/operators/test_cloud_composer_system.py b/tests/providers/google/cloud/operators/test_cloud_composer_system.py
deleted file mode 100644
index 602d0d1dee..0000000000
--- a/tests/providers/google/cloud/operators/test_cloud_composer_system.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""System tests for Google Cloud Composer operators"""
-from __future__ import annotations
-
-import pytest
-
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_CLOUD_COMPOSER
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-
-@pytest.mark.backend("mysql", "postgres")
-@pytest.mark.credential_file(GCP_CLOUD_COMPOSER)
-class CloudComposerSystemTest(GoogleSystemTest):
- """
- System tests for Google Cloud Composer operators
-
- It use a real service.
- """
-
- @provide_gcp_context(GCP_CLOUD_COMPOSER)
- def setUp(self):
- super().setUp()
-
- @provide_gcp_context(GCP_CLOUD_COMPOSER)
- def test_run_example_dag_composer(self):
- self.run_dag("composer_dag1", CLOUD_DAG_FOLDER)
-
- @provide_gcp_context(GCP_CLOUD_COMPOSER)
- def tearDown(self):
- super().tearDown()
diff --git a/tests/providers/google/cloud/sensors/test_cloud_composer.py b/tests/providers/google/cloud/sensors/test_cloud_composer.py
new file mode 100644
index 0000000000..c00521c1eb
--- /dev/null
+++ b/tests/providers/google/cloud/sensors/test_cloud_composer.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest import TestCase, mock
+
+import pytest
+
+from airflow.exceptions import AirflowException, TaskDeferred
+from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerEnvironmentSensor
+from airflow.providers.google.cloud.triggers.cloud_composer import CloudComposerExecutionTrigger
+
+TEST_PROJECT_ID = "test_project_id"
+TEST_OPERATION_NAME = "test_operation_name"
+TEST_REGION = "region"
+
+
+class TestCloudComposerEnvironmentSensor(TestCase):
+ def test_cloud_composer_existence_sensor_async(self):
+ """
+ Asserts that a task is deferred and a CloudComposerExecutionTrigger will be fired
+ when the CloudComposerEnvironmentSensor is executed.
+ """
+ task = CloudComposerEnvironmentSensor(
+ task_id="task_id",
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ operation_name=TEST_OPERATION_NAME,
+ )
+ with pytest.raises(TaskDeferred) as exc:
+ task.execute(context={})
+ assert isinstance(
+ exc.value.trigger, CloudComposerExecutionTrigger
+ ), "Trigger is not a CloudComposerExecutionTrigger"
+
+ def test_cloud_composer_existence_sensor_async_execute_failure(self):
+ """Tests that an AirflowException is raised in case of error event."""
+ task = CloudComposerEnvironmentSensor(
+ task_id="task_id",
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ operation_name=TEST_OPERATION_NAME,
+ )
+ with pytest.raises(AirflowException, match="No event received in trigger callback"):
+ task.execute_complete(context={}, event=None)
+
+ def test_cloud_composer_existence_sensor_async_execute_complete(self):
+ """Asserts that logging occurs as expected"""
+ task = CloudComposerEnvironmentSensor(
+ task_id="task_id",
+ project_id=TEST_PROJECT_ID,
+ region=TEST_REGION,
+ operation_name=TEST_OPERATION_NAME,
+ )
+ with mock.patch.object(task.log, "info"):
+ task.execute_complete(
+ context={}, event={"operation_done": True, "operation_name": TEST_OPERATION_NAME}
+ )
diff --git a/tests/system/providers/google/cloud/composer/__init__.py b/tests/system/providers/google/cloud/composer/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/composer/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_composer.py b/tests/system/providers/google/cloud/composer/example_cloud_composer.py
similarity index 66%
copy from airflow/providers/google/cloud/example_dags/example_cloud_composer.py
copy to tests/system/providers/google/cloud/composer/example_cloud_composer.py
index 59cdbbd5d6..8ffa8774d4 100644
--- a/airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+++ b/tests/system/providers/google/cloud/composer/example_cloud_composer.py
@@ -30,16 +30,22 @@ from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerListImageVersionsOperator,
CloudComposerUpdateEnvironmentOperator,
)
+from airflow.utils.trigger_rule import TriggerRule
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<PROJECT_ID>")
-REGION = os.environ.get("GCP_REGION", "<REGION>")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "example_composer"
+
+REGION = "us-central1"
# [START howto_operator_composer_simple_environment]
-ENVIRONMENT_ID = os.environ.get("ENVIRONMENT_ID", "ENVIRONMENT_ID>")
+
+ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}"
+
ENVIRONMENT = {
"config": {
- "node_count": 3,
- "software_config": {"image_version": "composer-1.17.7-airflow-2.1.4"},
+ "software_config": {"image_version": "composer-2.0.28-airflow-2.2.5"},
}
}
# [END howto_operator_composer_simple_environment]
@@ -55,10 +61,11 @@ UPDATE_MASK = {"paths": ["labels.label1"]}
with models.DAG(
- "composer_dag1",
+ DAG_ID,
+ schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["example", "composer"],
) as dag:
# [START howto_operator_composer_image_list]
image_versions = CloudComposerListImageVersionsOperator(
@@ -112,51 +119,18 @@ with models.DAG(
environment_id=ENVIRONMENT_ID,
)
# [END howto_operator_delete_composer_environment]
+ delete_env.trigger_rule = TriggerRule.ALL_DONE
chain(image_versions, create_env, list_envs, get_env, update_env, delete_env)
+ from tests.system.utils.watcher import watcher
-with models.DAG(
- "composer_dag_deferrable1",
- start_date=datetime(2021, 1, 1),
- catchup=False,
- tags=["example"],
-) as defer_dag:
- # [START howto_operator_create_composer_environment_deferrable_mode]
- defer_create_env = CloudComposerCreateEnvironmentOperator(
- task_id="defer_create_env",
- project_id=PROJECT_ID,
- region=REGION,
- environment_id=ENVIRONMENT_ID,
- environment=ENVIRONMENT,
- deferrable=True,
- )
- # [END howto_operator_create_composer_environment_deferrable_mode]
+ # This test needs watcher in order to properly mark success/failure
+ # when "teardown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
- # [START howto_operator_update_composer_environment_deferrable_mode]
- defer_update_env = CloudComposerUpdateEnvironmentOperator(
- task_id="defer_update_env",
- project_id=PROJECT_ID,
- region=REGION,
- environment_id=ENVIRONMENT_ID,
- update_mask=UPDATE_MASK,
- environment=UPDATED_ENVIRONMENT,
- deferrable=True,
- )
- # [END howto_operator_update_composer_environment_deferrable_mode]
- # [START howto_operator_delete_composer_environment_deferrable_mode]
- defer_delete_env = CloudComposerDeleteEnvironmentOperator(
- task_id="defer_delete_env",
- project_id=PROJECT_ID,
- region=REGION,
- environment_id=ENVIRONMENT_ID,
- deferrable=True,
- )
- # [END howto_operator_delete_composer_environment_deferrable_mode]
+from tests.system.utils import get_test_run # noqa: E402
- chain(
- defer_create_env,
- defer_update_env,
- defer_delete_env,
- )
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_composer.py b/tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
similarity index 54%
rename from airflow/providers/google/cloud/example_dags/example_cloud_composer.py
rename to tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
index 59cdbbd5d6..c7cfd2a090 100644
--- a/airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+++ b/tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
@@ -25,21 +25,23 @@ from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerCreateEnvironmentOperator,
CloudComposerDeleteEnvironmentOperator,
- CloudComposerGetEnvironmentOperator,
- CloudComposerListEnvironmentsOperator,
- CloudComposerListImageVersionsOperator,
CloudComposerUpdateEnvironmentOperator,
)
+from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerEnvironmentSensor
+from airflow.utils.trigger_rule import TriggerRule
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<PROJECT_ID>")
-REGION = os.environ.get("GCP_REGION", "<REGION>")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "example_composer_deferrable"
+
+REGION = "us-central1"
+
+ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}"
# [START howto_operator_composer_simple_environment]
-ENVIRONMENT_ID = os.environ.get("ENVIRONMENT_ID", "ENVIRONMENT_ID>")
ENVIRONMENT = {
"config": {
- "node_count": 3,
- "software_config": {"image_version": "composer-1.17.7-airflow-2.1.4"},
+ "software_config": {"image_version": "composer-2.0.28-airflow-2.2.5"},
}
}
# [END howto_operator_composer_simple_environment]
@@ -47,81 +49,20 @@ ENVIRONMENT = {
# [START howto_operator_composer_update_environment]
UPDATED_ENVIRONMENT = {
"labels": {
- "label1": "testing",
+ "label2": "testing",
}
}
-UPDATE_MASK = {"paths": ["labels.label1"]}
+UPDATE_MASK = {"paths": ["labels.label2"]}
# [END howto_operator_composer_update_environment]
with models.DAG(
- "composer_dag1",
+ DAG_ID,
+ schedule="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
- tags=["example"],
+ tags=["example", "composer"],
) as dag:
- # [START howto_operator_composer_image_list]
- image_versions = CloudComposerListImageVersionsOperator(
- task_id="image_versions",
- project_id=PROJECT_ID,
- region=REGION,
- )
- # [END howto_operator_composer_image_list]
-
- # [START howto_operator_create_composer_environment]
- create_env = CloudComposerCreateEnvironmentOperator(
- task_id="create_env",
- project_id=PROJECT_ID,
- region=REGION,
- environment_id=ENVIRONMENT_ID,
- environment=ENVIRONMENT,
- )
- # [END howto_operator_create_composer_environment]
-
- # [START howto_operator_list_composer_environments]
- list_envs = CloudComposerListEnvironmentsOperator(
- task_id="list_envs", project_id=PROJECT_ID, region=REGION
- )
- # [END howto_operator_list_composer_environments]
-
- # [START howto_operator_get_composer_environment]
- get_env = CloudComposerGetEnvironmentOperator(
- task_id="get_env",
- project_id=PROJECT_ID,
- region=REGION,
- environment_id=ENVIRONMENT_ID,
- )
- # [END howto_operator_get_composer_environment]
-
- # [START howto_operator_update_composer_environment]
- update_env = CloudComposerUpdateEnvironmentOperator(
- task_id="update_env",
- project_id=PROJECT_ID,
- region=REGION,
- environment_id=ENVIRONMENT_ID,
- update_mask=UPDATE_MASK,
- environment=UPDATED_ENVIRONMENT,
- )
- # [END howto_operator_update_composer_environment]
-
- # [START howto_operator_delete_composer_environment]
- delete_env = CloudComposerDeleteEnvironmentOperator(
- task_id="delete_env",
- project_id=PROJECT_ID,
- region=REGION,
- environment_id=ENVIRONMENT_ID,
- )
- # [END howto_operator_delete_composer_environment]
-
- chain(image_versions, create_env, list_envs, get_env, update_env, delete_env)
-
-
-with models.DAG(
- "composer_dag_deferrable1",
- start_date=datetime(2021, 1, 1),
- catchup=False,
- tags=["example"],
-) as defer_dag:
# [START howto_operator_create_composer_environment_deferrable_mode]
defer_create_env = CloudComposerCreateEnvironmentOperator(
task_id="defer_create_env",
@@ -133,6 +74,15 @@ with models.DAG(
)
# [END howto_operator_create_composer_environment_deferrable_mode]
+ operation_name = defer_create_env.output["operation_id"]
+
+ wait_for_execution = CloudComposerEnvironmentSensor(
+ task_id="wait_for_execution",
+ operation_name=operation_name,
+ region=REGION,
+ project_id=PROJECT_ID,
+ )
+
# [START howto_operator_update_composer_environment_deferrable_mode]
defer_update_env = CloudComposerUpdateEnvironmentOperator(
task_id="defer_update_env",
@@ -154,9 +104,23 @@ with models.DAG(
deferrable=True,
)
# [END howto_operator_delete_composer_environment_deferrable_mode]
+ defer_delete_env.trigger_rule = TriggerRule.ALL_DONE
chain(
defer_create_env,
+ wait_for_execution,
defer_update_env,
defer_delete_env,
)
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "teardown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)