You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/31 05:14:18 UTC

[airflow] branch main updated: Migration of System Tests: Cloud Composer (AIP-47) (#27227)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new eb8c0cf0d2 Migration of System Tests: Cloud Composer (AIP-47)  (#27227)
eb8c0cf0d2 is described below

commit eb8c0cf0d2c657824f666e874ec4e21940931ea4
Author: Beata Kossakowska <10...@users.noreply.github.com>
AuthorDate: Mon Oct 31 06:14:11 2022 +0100

    Migration of System Tests: Cloud Composer (AIP-47)  (#27227)
---
 .../google/cloud/operators/cloud_composer.py       |   2 +
 .../google/cloud/sensors/cloud_composer.py         |  99 ++++++++++++++++++
 .../google/cloud/triggers/cloud_composer.py        |   3 +-
 airflow/providers/google/provider.yaml             |   3 +
 .../operators/cloud/cloud_composer.rst             |  22 ++--
 .../cloud/operators/test_cloud_composer_system.py  |  46 ---------
 .../google/cloud/sensors/test_cloud_composer.py    |  73 ++++++++++++++
 .../providers/google/cloud/composer/__init__.py    |  16 +++
 .../cloud/composer}/example_cloud_composer.py      |  70 ++++---------
 .../composer/example_cloud_composer_deferrable.py  | 112 +++++++--------------
 10 files changed, 265 insertions(+), 181 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/cloud_composer.py b/airflow/providers/google/cloud/operators/cloud_composer.py
index eaffecaa01..1a245d2eab 100644
--- a/airflow/providers/google/cloud/operators/cloud_composer.py
+++ b/airflow/providers/google/cloud/operators/cloud_composer.py
@@ -180,6 +180,8 @@ class CloudComposerCreateEnvironmentOperator(BaseOperator):
                 timeout=self.timeout,
                 metadata=self.metadata,
             )
+            context["ti"].xcom_push(key="operation_id", value=result.operation.name)
+
             if not self.deferrable:
                 environment = hook.wait_for_operation(timeout=self.timeout, operation=result)
                 return Environment.to_dict(environment)
diff --git a/airflow/providers/google/cloud/sensors/cloud_composer.py b/airflow/providers/google/cloud/sensors/cloud_composer.py
new file mode 100644
index 0000000000..ad4ae92331
--- /dev/null
+++ b/airflow/providers/google/cloud/sensors/cloud_composer.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Cloud Composer sensor."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.triggers.cloud_composer import CloudComposerExecutionTrigger
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class CloudComposerEnvironmentSensor(BaseSensorOperator):
+    """
+    Check the status of the Cloud Composer Environment task
+
+    :param project_id: Required. The ID of the Google Cloud project that the service belongs to.
+    :param region: Required. The ID of the Google Cloud region that the service belongs to.
+    :param operation_name: The name of the operation resource
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, the service account making the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :param pooling_period_seconds: Optional: Control the rate of the poll for the result of deferrable run.
+    """
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        operation_name: str,
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: str | None = None,
+        impersonation_chain: str | Sequence[str] | None = None,
+        pooling_period_seconds: int = 30,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.operation_name = operation_name
+        self.pooling_period_seconds = pooling_period_seconds
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context) -> None:
+        """Airflow runs this method on the worker and defers using the trigger."""
+        self.defer(
+            trigger=CloudComposerExecutionTrigger(
+                project_id=self.project_id,
+                region=self.region,
+                operation_name=self.operation_name,
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+                delegate_to=self.delegate_to,
+                pooling_period_seconds=self.pooling_period_seconds,
+            ),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: dict[str, Any], event: dict[str, str] | None = None) -> str:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes execution was
+        successful.
+        """
+        if event:
+            if event.get("operation_done"):
+                return event["operation_done"]
+            raise AirflowException(event["message"])
+        raise AirflowException("No event received in trigger callback")
diff --git a/airflow/providers/google/cloud/triggers/cloud_composer.py b/airflow/providers/google/cloud/triggers/cloud_composer.py
index 9a745e7819..20613b70ab 100644
--- a/airflow/providers/google/cloud/triggers/cloud_composer.py
+++ b/airflow/providers/google/cloud/triggers/cloud_composer.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from __future__ import annotations
 
 import asyncio
@@ -42,11 +43,9 @@ class CloudComposerExecutionTrigger(BaseTrigger):
         self.project_id = project_id
         self.region = region
         self.operation_name = operation_name
-
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
         self.delegate_to = delegate_to
-
         self.pooling_period_seconds = pooling_period_seconds
 
         self.gcp_hook = CloudComposerAsyncHook(
diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml
index 10c47c7d54..3e105f13e1 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -595,6 +595,9 @@ sensors:
   - integration-name: Google Bigtable
     python-modules:
       - airflow.providers.google.cloud.sensors.bigtable
+  - integration-name: Google Cloud Composer
+    python-modules:
+      - airflow.providers.google.cloud.sensors.cloud_composer
   - integration-name: Google Cloud Storage Transfer Service
     python-modules:
       - airflow.providers.google.cloud.sensors.cloud_storage_transfer_service
diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst b/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
index 14a29e9361..ada347def7 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_composer.rst
@@ -39,7 +39,7 @@ For more information about the available fields to pass when creating a environm
 
 A simple environment configuration can look as followed:
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 0
     :start-after: [START howto_operator_composer_simple_environment]
@@ -48,7 +48,7 @@ A simple environment configuration can look as followed:
 With this configuration we can create the environment:
 :class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_create_composer_environment]
@@ -57,7 +57,7 @@ With this configuration we can create the environment:
 or you can define the same operator in the deferrable mode:
 :class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_create_composer_environment_deferrable_mode]
@@ -70,7 +70,7 @@ To get a environment you can use:
 
 :class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerGetEnvironmentOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_get_composer_environment]
@@ -83,7 +83,7 @@ To get a environment you can use:
 
 :class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerListEnvironmentsOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_list_composer_environments]
@@ -98,7 +98,7 @@ For more information on updateMask and other parameters take a look at `Cloud Co
 
 An example of a new service config and the updateMask:
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 0
     :start-after: [START howto_operator_composer_update_environment]
@@ -107,7 +107,7 @@ An example of a new service config and the updateMask:
 To update a service you can use:
 :class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerUpdateEnvironmentOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_update_composer_environment]
@@ -116,7 +116,7 @@ To update a service you can use:
 or you can define the same operator in the deferrable mode:
 :class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerCreateEnvironmentOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_update_composer_environment_deferrable_mode]
@@ -129,7 +129,7 @@ To delete a service you can use:
 
 :class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerDeleteEnvironmentOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_delete_composer_environment]
@@ -138,7 +138,7 @@ To delete a service you can use:
 or you can define the same operator in the deferrable mode:
 :class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerDeleteEnvironmentOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_delete_composer_environment_deferrable_mode]
@@ -152,7 +152,7 @@ You can also list all supported Cloud Composer images:
 
 :class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerListImageVersionsOperator`
 
-.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+.. exampleinclude:: /../../tests/system/providers/google/cloud/composer/example_cloud_composer.py
     :language: python
     :dedent: 4
     :start-after: [START howto_operator_composer_image_list]
diff --git a/tests/providers/google/cloud/operators/test_cloud_composer_system.py b/tests/providers/google/cloud/operators/test_cloud_composer_system.py
deleted file mode 100644
index 602d0d1dee..0000000000
--- a/tests/providers/google/cloud/operators/test_cloud_composer_system.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""System tests for Google Cloud Composer operators"""
-from __future__ import annotations
-
-import pytest
-
-from tests.providers.google.cloud.utils.gcp_authenticator import GCP_CLOUD_COMPOSER
-from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
-
-
-@pytest.mark.backend("mysql", "postgres")
-@pytest.mark.credential_file(GCP_CLOUD_COMPOSER)
-class CloudComposerSystemTest(GoogleSystemTest):
-    """
-    System tests for Google Cloud Composer operators
-
-    It use a real service.
-    """
-
-    @provide_gcp_context(GCP_CLOUD_COMPOSER)
-    def setUp(self):
-        super().setUp()
-
-    @provide_gcp_context(GCP_CLOUD_COMPOSER)
-    def test_run_example_dag_composer(self):
-        self.run_dag("composer_dag1", CLOUD_DAG_FOLDER)
-
-    @provide_gcp_context(GCP_CLOUD_COMPOSER)
-    def tearDown(self):
-        super().tearDown()
diff --git a/tests/providers/google/cloud/sensors/test_cloud_composer.py b/tests/providers/google/cloud/sensors/test_cloud_composer.py
new file mode 100644
index 0000000000..c00521c1eb
--- /dev/null
+++ b/tests/providers/google/cloud/sensors/test_cloud_composer.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest import TestCase, mock
+
+import pytest
+
+from airflow.exceptions import AirflowException, TaskDeferred
+from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerEnvironmentSensor
+from airflow.providers.google.cloud.triggers.cloud_composer import CloudComposerExecutionTrigger
+
+TEST_PROJECT_ID = "test_project_id"
+TEST_OPERATION_NAME = "test_operation_name"
+TEST_REGION = "region"
+
+
+class TestCloudComposerEnvironmentSensor(TestCase):
+    def test_cloud_composer_existence_sensor_async(self):
+        """
+        Asserts that a task is deferred and a CloudComposerExecutionTrigger will be fired
+        when the CloudComposerEnvironmentSensor is executed.
+        """
+        task = CloudComposerEnvironmentSensor(
+            task_id="task_id",
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            operation_name=TEST_OPERATION_NAME,
+        )
+        with pytest.raises(TaskDeferred) as exc:
+            task.execute(context={})
+        assert isinstance(
+            exc.value.trigger, CloudComposerExecutionTrigger
+        ), "Trigger is not a CloudComposerExecutionTrigger"
+
+    def test_cloud_composer_existence_sensor_async_execute_failure(self):
+        """Tests that an AirflowException is raised in case of error event."""
+        task = CloudComposerEnvironmentSensor(
+            task_id="task_id",
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            operation_name=TEST_OPERATION_NAME,
+        )
+        with pytest.raises(AirflowException, match="No event received in trigger callback"):
+            task.execute_complete(context={}, event=None)
+
+    def test_cloud_composer_existence_sensor_async_execute_complete(self):
+        """Asserts that logging occurs as expected"""
+        task = CloudComposerEnvironmentSensor(
+            task_id="task_id",
+            project_id=TEST_PROJECT_ID,
+            region=TEST_REGION,
+            operation_name=TEST_OPERATION_NAME,
+        )
+        with mock.patch.object(task.log, "info"):
+            task.execute_complete(
+                context={}, event={"operation_done": True, "operation_name": TEST_OPERATION_NAME}
+            )
diff --git a/tests/system/providers/google/cloud/composer/__init__.py b/tests/system/providers/google/cloud/composer/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/system/providers/google/cloud/composer/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_composer.py b/tests/system/providers/google/cloud/composer/example_cloud_composer.py
similarity index 66%
copy from airflow/providers/google/cloud/example_dags/example_cloud_composer.py
copy to tests/system/providers/google/cloud/composer/example_cloud_composer.py
index 59cdbbd5d6..8ffa8774d4 100644
--- a/airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+++ b/tests/system/providers/google/cloud/composer/example_cloud_composer.py
@@ -30,16 +30,22 @@ from airflow.providers.google.cloud.operators.cloud_composer import (
     CloudComposerListImageVersionsOperator,
     CloudComposerUpdateEnvironmentOperator,
 )
+from airflow.utils.trigger_rule import TriggerRule
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<PROJECT_ID>")
-REGION = os.environ.get("GCP_REGION", "<REGION>")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+
+DAG_ID = "example_composer"
+
+REGION = "us-central1"
 
 # [START howto_operator_composer_simple_environment]
-ENVIRONMENT_ID = os.environ.get("ENVIRONMENT_ID", "ENVIRONMENT_ID>")
+
+ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}"
+
 ENVIRONMENT = {
     "config": {
-        "node_count": 3,
-        "software_config": {"image_version": "composer-1.17.7-airflow-2.1.4"},
+        "software_config": {"image_version": "composer-2.0.28-airflow-2.2.5"},
     }
 }
 # [END howto_operator_composer_simple_environment]
@@ -55,10 +61,11 @@ UPDATE_MASK = {"paths": ["labels.label1"]}
 
 
 with models.DAG(
-    "composer_dag1",
+    DAG_ID,
+    schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example"],
+    tags=["example", "composer"],
 ) as dag:
     # [START howto_operator_composer_image_list]
     image_versions = CloudComposerListImageVersionsOperator(
@@ -112,51 +119,18 @@ with models.DAG(
         environment_id=ENVIRONMENT_ID,
     )
     # [END howto_operator_delete_composer_environment]
+    delete_env.trigger_rule = TriggerRule.ALL_DONE
 
     chain(image_versions, create_env, list_envs, get_env, update_env, delete_env)
 
+    from tests.system.utils.watcher import watcher
 
-with models.DAG(
-    "composer_dag_deferrable1",
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=["example"],
-) as defer_dag:
-    # [START howto_operator_create_composer_environment_deferrable_mode]
-    defer_create_env = CloudComposerCreateEnvironmentOperator(
-        task_id="defer_create_env",
-        project_id=PROJECT_ID,
-        region=REGION,
-        environment_id=ENVIRONMENT_ID,
-        environment=ENVIRONMENT,
-        deferrable=True,
-    )
-    # [END howto_operator_create_composer_environment_deferrable_mode]
+    # This test needs watcher in order to properly mark success/failure
+    # when "teardown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
 
-    # [START howto_operator_update_composer_environment_deferrable_mode]
-    defer_update_env = CloudComposerUpdateEnvironmentOperator(
-        task_id="defer_update_env",
-        project_id=PROJECT_ID,
-        region=REGION,
-        environment_id=ENVIRONMENT_ID,
-        update_mask=UPDATE_MASK,
-        environment=UPDATED_ENVIRONMENT,
-        deferrable=True,
-    )
-    # [END howto_operator_update_composer_environment_deferrable_mode]
 
-    # [START howto_operator_delete_composer_environment_deferrable_mode]
-    defer_delete_env = CloudComposerDeleteEnvironmentOperator(
-        task_id="defer_delete_env",
-        project_id=PROJECT_ID,
-        region=REGION,
-        environment_id=ENVIRONMENT_ID,
-        deferrable=True,
-    )
-    # [END howto_operator_delete_composer_environment_deferrable_mode]
+from tests.system.utils import get_test_run  # noqa: E402
 
-    chain(
-        defer_create_env,
-        defer_update_env,
-        defer_delete_env,
-    )
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_composer.py b/tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
similarity index 54%
rename from airflow/providers/google/cloud/example_dags/example_cloud_composer.py
rename to tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
index 59cdbbd5d6..c7cfd2a090 100644
--- a/airflow/providers/google/cloud/example_dags/example_cloud_composer.py
+++ b/tests/system/providers/google/cloud/composer/example_cloud_composer_deferrable.py
@@ -25,21 +25,23 @@ from airflow.models.baseoperator import chain
 from airflow.providers.google.cloud.operators.cloud_composer import (
     CloudComposerCreateEnvironmentOperator,
     CloudComposerDeleteEnvironmentOperator,
-    CloudComposerGetEnvironmentOperator,
-    CloudComposerListEnvironmentsOperator,
-    CloudComposerListImageVersionsOperator,
     CloudComposerUpdateEnvironmentOperator,
 )
+from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerEnvironmentSensor
+from airflow.utils.trigger_rule import TriggerRule
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<PROJECT_ID>")
-REGION = os.environ.get("GCP_REGION", "<REGION>")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
+DAG_ID = "example_composer_deferrable"
+
+REGION = "us-central1"
+
+ENVIRONMENT_ID = f"test-{DAG_ID}-{ENV_ID}"
 # [START howto_operator_composer_simple_environment]
-ENVIRONMENT_ID = os.environ.get("ENVIRONMENT_ID", "ENVIRONMENT_ID>")
 ENVIRONMENT = {
     "config": {
-        "node_count": 3,
-        "software_config": {"image_version": "composer-1.17.7-airflow-2.1.4"},
+        "software_config": {"image_version": "composer-2.0.28-airflow-2.2.5"},
     }
 }
 # [END howto_operator_composer_simple_environment]
@@ -47,81 +49,20 @@ ENVIRONMENT = {
 # [START howto_operator_composer_update_environment]
 UPDATED_ENVIRONMENT = {
     "labels": {
-        "label1": "testing",
+        "label2": "testing",
     }
 }
-UPDATE_MASK = {"paths": ["labels.label1"]}
+UPDATE_MASK = {"paths": ["labels.label2"]}
 # [END howto_operator_composer_update_environment]
 
 
 with models.DAG(
-    "composer_dag1",
+    DAG_ID,
+    schedule="@once",
     start_date=datetime(2021, 1, 1),
     catchup=False,
-    tags=["example"],
+    tags=["example", "composer"],
 ) as dag:
-    # [START howto_operator_composer_image_list]
-    image_versions = CloudComposerListImageVersionsOperator(
-        task_id="image_versions",
-        project_id=PROJECT_ID,
-        region=REGION,
-    )
-    # [END howto_operator_composer_image_list]
-
-    # [START howto_operator_create_composer_environment]
-    create_env = CloudComposerCreateEnvironmentOperator(
-        task_id="create_env",
-        project_id=PROJECT_ID,
-        region=REGION,
-        environment_id=ENVIRONMENT_ID,
-        environment=ENVIRONMENT,
-    )
-    # [END howto_operator_create_composer_environment]
-
-    # [START howto_operator_list_composer_environments]
-    list_envs = CloudComposerListEnvironmentsOperator(
-        task_id="list_envs", project_id=PROJECT_ID, region=REGION
-    )
-    # [END howto_operator_list_composer_environments]
-
-    # [START howto_operator_get_composer_environment]
-    get_env = CloudComposerGetEnvironmentOperator(
-        task_id="get_env",
-        project_id=PROJECT_ID,
-        region=REGION,
-        environment_id=ENVIRONMENT_ID,
-    )
-    # [END howto_operator_get_composer_environment]
-
-    # [START howto_operator_update_composer_environment]
-    update_env = CloudComposerUpdateEnvironmentOperator(
-        task_id="update_env",
-        project_id=PROJECT_ID,
-        region=REGION,
-        environment_id=ENVIRONMENT_ID,
-        update_mask=UPDATE_MASK,
-        environment=UPDATED_ENVIRONMENT,
-    )
-    # [END howto_operator_update_composer_environment]
-
-    # [START howto_operator_delete_composer_environment]
-    delete_env = CloudComposerDeleteEnvironmentOperator(
-        task_id="delete_env",
-        project_id=PROJECT_ID,
-        region=REGION,
-        environment_id=ENVIRONMENT_ID,
-    )
-    # [END howto_operator_delete_composer_environment]
-
-    chain(image_versions, create_env, list_envs, get_env, update_env, delete_env)
-
-
-with models.DAG(
-    "composer_dag_deferrable1",
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=["example"],
-) as defer_dag:
     # [START howto_operator_create_composer_environment_deferrable_mode]
     defer_create_env = CloudComposerCreateEnvironmentOperator(
         task_id="defer_create_env",
@@ -133,6 +74,15 @@ with models.DAG(
     )
     # [END howto_operator_create_composer_environment_deferrable_mode]
 
+    operation_name = defer_create_env.output["operation_id"]
+
+    wait_for_execution = CloudComposerEnvironmentSensor(
+        task_id="wait_for_execution",
+        operation_name=operation_name,
+        region=REGION,
+        project_id=PROJECT_ID,
+    )
+
     # [START howto_operator_update_composer_environment_deferrable_mode]
     defer_update_env = CloudComposerUpdateEnvironmentOperator(
         task_id="defer_update_env",
@@ -154,9 +104,23 @@ with models.DAG(
         deferrable=True,
     )
     # [END howto_operator_delete_composer_environment_deferrable_mode]
+    defer_delete_env.trigger_rule = TriggerRule.ALL_DONE
 
     chain(
         defer_create_env,
+        wait_for_execution,
         defer_update_env,
         defer_delete_env,
     )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "teardown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)