You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/03/08 13:17:58 UTC

[airflow] 03/12: Add deferrable mode to CloudBuildCreateBuildOperator (#27783)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d503914b60a16622733a443540b71975edab1faf
Author: VladaZakharova <80...@users.noreply.github.com>
AuthorDate: Sat Dec 3 11:47:03 2022 +0100

    Add deferrable mode to CloudBuildCreateBuildOperator (#27783)
    
    (cherry picked from commit c931d888936a958ae40b69077d35215227bf1dff)
---
 .../providers/google/cloud/hooks/cloud_build.py    |  98 +++++++-
 .../google/cloud/operators/cloud_build.py          |  90 +++++--
 .../providers/google/cloud/triggers/cloud_build.py | 125 ++++++++++
 .../operators/cloud/cloud_build.rst                |  50 ++++
 .../google/cloud/hooks/test_cloud_build.py         |  42 +++-
 .../google/cloud/operators/test_cloud_build.py     | 265 +++++++++++++++++----
 .../google/cloud/triggers/test_cloud_build.py      | 240 +++++++++++++++++++
 .../cloud/cloud_build/example_cloud_build.py       |  18 +-
 ...cloud_build.py => example_cloud_build_async.py} |  44 ++--
 .../cloud_build/example_cloud_build_trigger.py     |  12 +-
 10 files changed, 890 insertions(+), 94 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/cloud_build.py b/airflow/providers/google/cloud/hooks/cloud_build.py
index 6ba6fd06e9..0702968ccf 100644
--- a/airflow/providers/google/cloud/hooks/cloud_build.py
+++ b/airflow/providers/google/cloud/hooks/cloud_build.py
@@ -20,10 +20,11 @@ from __future__ import annotations
 
 from typing import Sequence
 
+from google.api_core.exceptions import AlreadyExists
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.api_core.operation import Operation
 from google.api_core.retry import Retry
-from google.cloud.devtools.cloudbuild import CloudBuildClient
+from google.cloud.devtools.cloudbuild_v1 import CloudBuildAsyncClient, CloudBuildClient, GetBuildRequest
 from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource
 
 from airflow.exceptions import AirflowException
@@ -77,6 +78,14 @@ class CloudBuildHook(GoogleBaseHook):
         except Exception:
             raise AirflowException("Could not retrieve Build ID from Operation.")
 
+    def wait_for_operation(self, operation: Operation, timeout: float | None = None):
+        """Waits for long-lasting operation to complete."""
+        try:
+            return operation.result(timeout=timeout)
+        except Exception:
+            error = operation.exception(timeout=timeout)
+            raise AirflowException(error)
+
     def get_conn(self) -> CloudBuildClient:
         """
         Retrieves the connection to Google Cloud Build.
@@ -123,6 +132,41 @@ class CloudBuildHook(GoogleBaseHook):
 
         return build
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_build_without_waiting_for_result(
+        self,
+        build: dict | Build,
+        project_id: str = PROVIDE_PROJECT_ID,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> tuple[Operation, str]:
+        """
+        Starts a build with the specified configuration without waiting for it to finish.
+
+        :param build: The build resource to create. If a dict is provided, it must be of the same form
+            as the protobuf message `google.cloud.devtools.cloudbuild_v1.types.Build`
+        :param project_id: Optional, Google Cloud Project project_id where the function belongs.
+            If set to None or missing, the default project_id from the GCP connection is used.
+        :param retry: Optional, a retry object used  to retry requests. If `None` is specified, requests
+            will not be retried.
+        :param timeout: Optional, the amount of time, in seconds, to wait for the request to complete.
+            Note that if `retry` is specified, the timeout applies to each individual attempt.
+        :param metadata: Optional, additional metadata that is provided to the method.
+        """
+        client = self.get_conn()
+
+        self.log.info("Start creating build...")
+
+        operation = client.create_build(
+            request={"project_id": project_id, "build": build},
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        id_ = self._get_build_id_from_operation(operation)
+        return operation, id_
+
     @GoogleBaseHook.fallback_to_default_project_id
     def create_build(
         self,
@@ -150,7 +194,7 @@ class CloudBuildHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
-        self.log.info("Start creating build.")
+        self.log.info("Start creating build...")
 
         operation = client.create_build(
             request={"project_id": project_id, "build": build},
@@ -195,14 +239,17 @@ class CloudBuildHook(GoogleBaseHook):
         """
         client = self.get_conn()
 
-        self.log.info("Start creating build trigger.")
+        self.log.info("Start creating build trigger...")
 
-        trigger = client.create_build_trigger(
-            request={"project_id": project_id, "trigger": trigger},
-            retry=retry,
-            timeout=timeout,
-            metadata=metadata,
-        )
+        try:
+            trigger = client.create_build_trigger(
+                request={"project_id": project_id, "trigger": trigger},
+                retry=retry,
+                timeout=timeout,
+                metadata=metadata,
+            )
+        except AlreadyExists:
+            raise AirflowException("Cloud Build Trigger with such parameters already exists.")
 
         self.log.info("Build trigger has been created.")
 
@@ -492,7 +539,6 @@ class CloudBuildHook(GoogleBaseHook):
         client = self.get_conn()
 
         self.log.info("Start running build trigger: %s.", trigger_id)
-
         operation = client.run_build_trigger(
             request={"project_id": project_id, "trigger_id": trigger_id, "source": source},
             retry=retry,
@@ -504,7 +550,6 @@ class CloudBuildHook(GoogleBaseHook):
 
         if not wait:
             return self.get_build(id_=id_, project_id=project_id)
-
         operation.result()
 
         self.log.info("Build trigger has been run: %s.", trigger_id)
@@ -550,3 +595,34 @@ class CloudBuildHook(GoogleBaseHook):
         self.log.info("Build trigger has been updated: %s.", trigger_id)
 
         return trigger
+
+
+class CloudBuildAsyncHook(GoogleBaseHook):
+    """Asynchronous Hook for the Google Cloud Build Service."""
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    async def get_cloud_build(
+        self,
+        id_: str,
+        project_id: str = PROVIDE_PROJECT_ID,
+        retry: Retry | _MethodDefault = DEFAULT,
+        timeout: float | None = None,
+        metadata: Sequence[tuple[str, str]] = (),
+    ) -> Build:
+        """Retrieves a Cloud Build with a specified id."""
+        if not id_:
+            raise AirflowException("Google Cloud Build id is required.")
+
+        client = CloudBuildAsyncClient()
+
+        request = GetBuildRequest(
+            project_id=project_id,
+            id=id_,
+        )
+        build_instance = await client.get_build(
+            request=request,
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return build_instance
diff --git a/airflow/providers/google/cloud/operators/cloud_build.py b/airflow/providers/google/cloud/operators/cloud_build.py
index c33fa36c64..33d8f40c88 100644
--- a/airflow/providers/google/cloud/operators/cloud_build.py
+++ b/airflow/providers/google/cloud/operators/cloud_build.py
@@ -37,6 +37,8 @@ from airflow.providers.google.cloud.links.cloud_build import (
     CloudBuildTriggerDetailsLink,
     CloudBuildTriggersListLink,
 )
+from airflow.providers.google.cloud.triggers.cloud_build import CloudBuildCreateBuildTrigger
+from airflow.providers.google.common.consts import GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
 from airflow.utils import yaml
 
 if TYPE_CHECKING:
@@ -147,7 +149,13 @@ class CloudBuildCreateBuildOperator(BaseOperator):
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding identity, with first
         account from the list granting this role to the originating account (templated).
-
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param retry: Designation of what errors, if any, should be retried.
+    :param timeout: The timeout for this request.
+    :param metadata: Strings which should be sent along with the request as metadata.
+    :param deferrable: Run operator in the deferrable mode
     """
 
     template_fields: Sequence[str] = ("project_id", "build", "gcp_conn_id", "impersonation_chain")
@@ -164,9 +172,15 @@ class CloudBuildCreateBuildOperator(BaseOperator):
         metadata: Sequence[tuple[str, str]] = (),
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
+        poll_interval: float = 4.0,
+        deferrable: bool = False,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
+        self.build = build
+        # Not template fields to keep original value
+        self.build_raw = build
         self.project_id = project_id
         self.wait = wait
         self.retry = retry
@@ -174,9 +188,9 @@ class CloudBuildCreateBuildOperator(BaseOperator):
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
-        self.build = build
-        # Not template fields to keep original value
-        self.build_raw = build
+        self.delegate_to = delegate_to
+        self.poll_interval = poll_interval
+        self.deferrable = deferrable
 
     def prepare_template(self) -> None:
         # if no file is specified, skip
@@ -189,29 +203,69 @@ class CloudBuildCreateBuildOperator(BaseOperator):
                 self.build = json.loads(file.read())
 
     def execute(self, context: Context):
-        hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
-
+        hook = CloudBuildHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+            delegate_to=self.delegate_to,
+        )
         build = BuildProcessor(build=self.build).process_body()
 
-        result = hook.create_build(
+        self.cloud_build_operation, self.id_ = hook.create_build_without_waiting_for_result(
             build=build,
             project_id=self.project_id,
-            wait=self.wait,
             retry=self.retry,
             timeout=self.timeout,
             metadata=self.metadata,
         )
-
-        self.xcom_push(context, key="id", value=result.id)
-        project_id = self.project_id or hook.project_id
-        if project_id:
-            CloudBuildLink.persist(
-                context=context,
-                task_instance=self,
-                project_id=project_id,
-                build_id=result.id,
+        self.xcom_push(context, key="id", value=self.id_)
+        if not self.wait:
+            return Build.to_dict(hook.get_build(id_=self.id_, project_id=self.project_id))
+
+        if self.deferrable:
+            self.defer(
+                trigger=CloudBuildCreateBuildTrigger(
+                    id_=self.id_,
+                    project_id=self.project_id,
+                    gcp_conn_id=self.gcp_conn_id,
+                    impersonation_chain=self.impersonation_chain,
+                    delegate_to=self.delegate_to,
+                    poll_interval=self.poll_interval,
+                ),
+                method_name=GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME,
             )
-        return Build.to_dict(result)
+        else:
+            cloud_build_instance_result = hook.wait_for_operation(
+                timeout=self.timeout, operation=self.cloud_build_operation
+            )
+            project_id = self.project_id or hook.project_id
+            if project_id:
+                CloudBuildLink.persist(
+                    context=context,
+                    task_instance=self,
+                    project_id=project_id,
+                    build_id=cloud_build_instance_result.id,
+                )
+            return Build.to_dict(cloud_build_instance_result)
+
+    def execute_complete(self, context: Context, event: dict):
+        if event["status"] == "success":
+            hook = CloudBuildHook(
+                gcp_conn_id=self.gcp_conn_id,
+                impersonation_chain=self.impersonation_chain,
+                delegate_to=self.delegate_to,
+            )
+            self.log.info("Cloud Build completed with response %s ", event["message"])
+            project_id = self.project_id or hook.project_id
+            if project_id:
+                CloudBuildLink.persist(
+                    context=context,
+                    task_instance=self,
+                    project_id=project_id,
+                    build_id=event["id_"],
+                )
+            return event["instance"]
+        else:
+            raise AirflowException(f"Unexpected error in the operation: {event['message']}")
 
 
 class CloudBuildCreateBuildTriggerOperator(BaseOperator):
diff --git a/airflow/providers/google/cloud/triggers/cloud_build.py b/airflow/providers/google/cloud/triggers/cloud_build.py
new file mode 100644
index 0000000000..130fc857f8
--- /dev/null
+++ b/airflow/providers/google/cloud/triggers/cloud_build.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any, AsyncIterator, Sequence
+
+from google.cloud.devtools.cloudbuild_v1.types import Build
+
+from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class CloudBuildCreateBuildTrigger(BaseTrigger):
+    """
+    CloudBuildCreateBuildTrigger run on the trigger worker to perform create Build operation
+
+    :param id_: The ID of the build.
+    :param project_id: Google Cloud Project where the job is running
+    :param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform.
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :param delegate_to: The account to impersonate using domain-wide delegation of authority,
+        if any. For this to work, the service account making the request must have
+        domain-wide delegation enabled.
+    :param poll_interval: polling period in seconds to check for the status
+    """
+
+    def __init__(
+        self,
+        id_: str,
+        project_id: str | None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        delegate_to: str | None = None,
+        poll_interval: float = 4.0,
+    ):
+        super().__init__()
+        self.id_ = id_
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.delegate_to = delegate_to
+        self.poll_interval = poll_interval
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes CloudBuildCreateBuildTrigger arguments and classpath."""
+        return (
+            "airflow.providers.google.cloud.triggers.cloud_build.CloudBuildCreateBuildTrigger",
+            {
+                "id_": self.id_,
+                "project_id": self.project_id,
+                "gcp_conn_id": self.gcp_conn_id,
+                "impersonation_chain": self.impersonation_chain,
+                "delegate_to": self.delegate_to,
+                "poll_interval": self.poll_interval,
+            },
+        )
+
+    async def run(self) -> AsyncIterator["TriggerEvent"]:  # type: ignore[override]
+        """Gets current build execution status and yields a TriggerEvent"""
+        hook = self._get_async_hook()
+        while True:
+            try:
+                # Poll for job execution status
+                cloud_build_instance = await hook.get_cloud_build(
+                    id_=self.id_,
+                    project_id=self.project_id,
+                )
+                if cloud_build_instance._pb.status in (Build.Status.SUCCESS,):
+                    yield TriggerEvent(
+                        {
+                            "instance": Build.to_dict(cloud_build_instance),
+                            "id_": self.id_,
+                            "status": "success",
+                            "message": "Build completed",
+                        }
+                    )
+                elif cloud_build_instance._pb.status in (
+                    Build.Status.WORKING,
+                    Build.Status.PENDING,
+                    Build.Status.QUEUED,
+                ):
+                    self.log.info("Build is still running...")
+                    self.log.info("Sleeping for %s seconds.", self.poll_interval)
+                    await asyncio.sleep(self.poll_interval)
+                elif cloud_build_instance._pb.status in (
+                    Build.Status.FAILURE,
+                    Build.Status.INTERNAL_ERROR,
+                    Build.Status.TIMEOUT,
+                    Build.Status.CANCELLED,
+                    Build.Status.EXPIRED,
+                ):
+                    yield TriggerEvent({"status": "error", "message": cloud_build_instance.status_detail})
+                else:
+                    yield TriggerEvent(
+                        {"status": "error", "message": "Unidentified status of Cloud Build instance"}
+                    )
+
+            except Exception as e:
+                self.log.exception("Exception occurred while checking for Cloud Build completion")
+                yield TriggerEvent({"status": "error", "message": str(e)})
+
+    def _get_async_hook(self) -> CloudBuildAsyncHook:
+        return CloudBuildAsyncHook(gcp_conn_id=self.gcp_conn_id)
diff --git a/docs/apache-airflow-providers-google/operators/cloud/cloud_build.rst b/docs/apache-airflow-providers-google/operators/cloud/cloud_build.rst
index 0e39a1399a..454e378c2f 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/cloud_build.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/cloud_build.rst
@@ -102,6 +102,14 @@ Trigger a build is performed with the
     :start-after: [START howto_operator_create_build_from_storage]
     :end-before: [END howto_operator_create_build_from_storage]
 
+You can use deferrable mode for this action in order to run the operator asynchronously:
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_build/example_cloud_build_async.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_operator_create_build_from_storage_async]
+    :end-before: [END howto_operator_create_build_from_storage_async]
+
 You can use :ref:`Jinja templating <concepts:jinja-templating>` with
 :template-fields:`airflow.providers.google.cloud.operators.cloud_build.CloudBuildCreateBuildOperator`
 parameters which allows you to dynamically determine values. The result is saved to :ref:`XCom <concepts:xcom>`, which allows it
@@ -122,6 +130,48 @@ you can pass wait=False as example shown below.
     :start-after: [START howto_operator_create_build_without_wait]
     :end-before: [END howto_operator_create_build_without_wait]
 
+You can use deferrable mode for this action in order to run the operator asynchronously:
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_build/example_cloud_build_async.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_operator_create_build_without_wait_async]
+    :end-before: [END howto_operator_create_build_without_wait_async]
+
+In order to start a build on Cloud Build you can use a build configuration file. A build config file defines the fields
+that are needed for Cloud Build to perform your tasks. You can write the build config file using the YAML or the JSON syntax.
+
+.. exampleinclude:: ../../../../tests/system/providers/google/cloud/cloud_build/example_cloud_build.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcp_create_build_from_yaml_body]
+    :end-before: [END howto_operator_gcp_create_build_from_yaml_body]
+
+You can use deferrable mode for this action in order to run the operator asynchronously:
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_build/example_cloud_build_async.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_operator_gcp_create_build_from_yaml_body_async]
+    :end-before: [END howto_operator_gcp_create_build_from_yaml_body_async]
+
+In addition, a Cloud Build can refer to source stored in `Google Cloud Source Repositories <https://cloud.google.com/source-repositories/docs/>`__.
+Once build has started, it ill build the code in source repositories.
+
+.. exampleinclude:: ../../../../tests/system/providers/google/cloud/cloud_build/example_cloud_build.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_operator_create_build_from_repo]
+    :end-before: [END howto_operator_create_build_from_repo]
+
+You can use deferrable mode for this action in order to run the operator asynchronously:
+
+.. exampleinclude:: /../../tests/system/providers/google/cloud/cloud_build/example_cloud_build_async.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_operator_create_build_from_repo_async]
+    :end-before: [END howto_operator_create_build_from_repo_async]
+
 .. _howto/operator:CloudBuildCreateBuildTriggerOperator:
 
 CloudBuildCreateBuildTriggerOperator
diff --git a/tests/providers/google/cloud/hooks/test_cloud_build.py b/tests/providers/google/cloud/hooks/test_cloud_build.py
index ec0e83e5e9..78af318414 100644
--- a/tests/providers/google/cloud/hooks/test_cloud_build.py
+++ b/tests/providers/google/cloud/hooks/test_cloud_build.py
@@ -21,18 +21,29 @@ functions in CloudBuildHook
 """
 from __future__ import annotations
 
+import sys
 import unittest
+from concurrent.futures import Future
 from unittest.mock import MagicMock, patch
 
+import pytest
 from google.api_core.gapic_v1.method import DEFAULT
+from google.cloud.devtools.cloudbuild_v1 import CloudBuildAsyncClient, GetBuildRequest
 
-from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook
+from airflow import AirflowException
+from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildAsyncHook, CloudBuildHook
 from airflow.providers.google.common.consts import CLIENT_INFO
 from tests.providers.google.cloud.utils.base_gcp_mock import mock_base_gcp_hook_no_default_project_id
 
+if sys.version_info < (3, 8):
+    from asynctest import mock
+else:
+    from unittest import mock
+
 PROJECT_ID = "cloud-build-project"
 LOCATION = "test-location"
 PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}"
+CLOUD_BUILD_PATH = "airflow.providers.google.cloud.hooks.cloud_build.{}"
 BUILD_ID = "test-build-id-9832662"
 REPO_SOURCE = {"repo_source": {"repo_name": "test_repo", "branch_name": "main"}}
 BUILD = {
@@ -298,3 +309,32 @@ class TestCloudBuildHook(unittest.TestCase):
             timeout=None,
             metadata=(),
         )
+
+
+class TestAsyncHook:
+    @pytest.fixture
+    def hook(self):
+        return CloudBuildAsyncHook(
+            gcp_conn_id="google_cloud_default",
+        )
+
+    @pytest.mark.asyncio
+    @mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self: None)
+    @mock.patch(CLOUD_BUILD_PATH.format("CloudBuildAsyncClient.get_build"))
+    async def test_async_cloud_build_service_client_creation_should_execute_successfully(
+        self, mocked_get_build, hook
+    ):
+        mocked_get_build.return_value = Future()
+        await hook.get_cloud_build(project_id=PROJECT_ID, id_=BUILD_ID)
+        request = GetBuildRequest(
+            dict(
+                project_id=PROJECT_ID,
+                id=BUILD_ID,
+            )
+        )
+        mocked_get_build.assert_called_once_with(request=request, retry=DEFAULT, timeout=None, metadata=())
+
+    @pytest.mark.asyncio
+    async def test_async_get_clod_build_without_build_id_should_throw_exception(self, hook):
+        with pytest.raises(AirflowException, match=r"Google Cloud Build id is required."):
+            await hook.get_cloud_build(project_id=PROJECT_ID, id_=None)
diff --git a/tests/providers/google/cloud/operators/test_cloud_build.py b/tests/providers/google/cloud/operators/test_cloud_build.py
index 1334679a78..e80dcd1fb0 100644
--- a/tests/providers/google/cloud/operators/test_cloud_build.py
+++ b/tests/providers/google/cloud/operators/test_cloud_build.py
@@ -30,7 +30,10 @@ from google.api_core.gapic_v1.method import DEFAULT
 from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource, StorageSource
 from parameterized import parameterized
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, TaskDeferred
+from airflow.models import DAG
+from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
 from airflow.providers.google.cloud.operators.cloud_build import (
     BuildProcessor,
     CloudBuildCancelBuildOperator,
@@ -45,12 +48,16 @@ from airflow.providers.google.cloud.operators.cloud_build import (
     CloudBuildRunBuildTriggerOperator,
     CloudBuildUpdateBuildTriggerOperator,
 )
+from airflow.providers.google.cloud.triggers.cloud_build import CloudBuildCreateBuildTrigger
+from airflow.utils.timezone import datetime
+from airflow.utils.types import DagRunType
 
 # pylint: disable=R0904, C0111
 
 
 GCP_CONN_ID = "google_cloud_default"
 PROJECT_ID = "cloud-build-project"
+CLOUD_BUILD_HOOK_PATH = "airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook"
 BUILD_ID = "test-build-id-9832661"
 REPO_SOURCE = {"repo_source": {"repo_name": "test_repo", "branch_name": "main"}}
 BUILD = {
@@ -65,35 +72,71 @@ BUILD_TRIGGER = {
 }
 OPERATION = {"metadata": {"build": {"id": BUILD_ID}}}
 TRIGGER_ID = "32488e7f-09d6-4fe9-a5fb-4ca1419a6e7a"
+TEST_BUILD_INSTANCE = dict(
+    id="test-build-id-9832662",
+    status=3,
+    steps=[
+        {
+            "name": "ubuntu",
+            "env": [],
+            "args": [],
+            "dir_": "",
+            "id": "",
+            "wait_for": [],
+            "entrypoint": "",
+            "secret_env": [],
+            "volumes": [],
+            "status": 0,
+            "script": "",
+        }
+    ],
+    name="",
+    project_id="",
+    status_detail="",
+    images=[],
+    logs_bucket="",
+    build_trigger_id="",
+    log_url="",
+    substitutions={},
+    tags=[],
+    secrets=[],
+    timing={},
+    service_account="",
+    warnings=[],
+)
 
 
 class TestCloudBuildOperator(TestCase):
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_cancel_build(self, mock_hook):
         mock_hook.return_value.cancel_build.return_value = Build()
+
         operator = CloudBuildCancelBuildOperator(id_=TRIGGER_ID, task_id="id")
-        context = mock.MagicMock()
-        operator.execute(context=context)
+        operator.execute(context=mock.MagicMock())
+
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
         mock_hook.return_value.cancel_build.assert_called_once_with(
             id_=TRIGGER_ID, project_id=None, retry=DEFAULT, timeout=None, metadata=()
         )
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_create_build(self, mock_hook):
-        mock_hook.return_value.create_build.return_value = Build()
+        mock_hook.return_value.create_build_without_waiting_for_result.return_value = (BUILD, BUILD_ID)
+        mock_hook.return_value.wait_for_operation.return_value = Build()
+
         operator = CloudBuildCreateBuildOperator(build=BUILD, task_id="id")
-        context = mock.MagicMock()
-        operator.execute(context=context)
-        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
+        operator.execute(context=mock.MagicMock())
+
+        mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None, delegate_to=None)
         build = Build(BUILD)
-        mock_hook.return_value.create_build.assert_called_once_with(
-            build=build, project_id=None, wait=True, retry=DEFAULT, timeout=None, metadata=()
+        mock_hook.return_value.create_build_without_waiting_for_result.assert_called_once_with(
+            build=build, project_id=None, retry=DEFAULT, timeout=None, metadata=()
         )
+        mock_hook.return_value.wait_for_operation.assert_called_once_with(timeout=None, operation=BUILD)
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_create_build_with_missing_build(self, mock_hook):
-        mock_hook.return_value.create_build.return_value = Build()
+        mock_hook.return_value.create_build_without_waiting_for_result.return_value = Build()
         with pytest.raises(AirflowException, match="missing keyword argument 'build'"):
             CloudBuildCreateBuildOperator(task_id="id")
 
@@ -125,56 +168,61 @@ class TestCloudBuildOperator(TestCase):
             expected_body = {"steps": [{"name": "ubuntu", "args": ["echo", "Hello {{ params.name }}!"]}]}
             assert expected_body == operator.build
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_create_build_trigger(self, mock_hook):
         mock_hook.return_value.create_build_trigger.return_value = BuildTrigger()
+
         operator = CloudBuildCreateBuildTriggerOperator(trigger=BUILD_TRIGGER, task_id="id")
-        context = mock.MagicMock()
-        operator.execute(context=context)
+        operator.execute(context=mock.MagicMock())
+
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
         mock_hook.return_value.create_build_trigger.assert_called_once_with(
             trigger=BUILD_TRIGGER, project_id=None, retry=DEFAULT, timeout=None, metadata=()
         )
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_delete_build_trigger(self, mock_hook):
         mock_hook.return_value.delete_build_trigger.return_value = None
+
         operator = CloudBuildDeleteBuildTriggerOperator(trigger_id=TRIGGER_ID, task_id="id")
-        context = mock.MagicMock()
-        operator.execute(context=context)
+        operator.execute(context=mock.MagicMock())
+
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
         mock_hook.return_value.delete_build_trigger.assert_called_once_with(
             trigger_id=TRIGGER_ID, project_id=None, retry=DEFAULT, timeout=None, metadata=()
         )
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_get_build(self, mock_hook):
         mock_hook.return_value.get_build.return_value = Build()
+
         operator = CloudBuildGetBuildOperator(id_=BUILD_ID, task_id="id")
-        context = mock.MagicMock()
-        operator.execute(context=context)
+        operator.execute(context=mock.MagicMock())
+
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
         mock_hook.return_value.get_build.assert_called_once_with(
             id_=BUILD_ID, project_id=None, retry=DEFAULT, timeout=None, metadata=()
         )
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_get_build_trigger(self, mock_hook):
         mock_hook.return_value.get_build_trigger.return_value = BuildTrigger()
+
         operator = CloudBuildGetBuildTriggerOperator(trigger_id=TRIGGER_ID, task_id="id")
-        context = mock.MagicMock()
-        operator.execute(context=context)
+        operator.execute(context=mock.MagicMock())
+
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
         mock_hook.return_value.get_build_trigger.assert_called_once_with(
             trigger_id=TRIGGER_ID, project_id=None, retry=DEFAULT, timeout=None, metadata=()
         )
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_list_build_triggers(self, mock_hook):
         mock_hook.return_value.list_build_triggers.return_value = mock.MagicMock()
+
         operator = CloudBuildListBuildTriggersOperator(task_id="id", location="global")
-        context = mock.MagicMock()
-        operator.execute(context=context)
+        operator.execute(context=mock.MagicMock())
+
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
         mock_hook.return_value.list_build_triggers.assert_called_once_with(
             project_id=None,
@@ -186,12 +234,13 @@ class TestCloudBuildOperator(TestCase):
             metadata=(),
         )
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_list_builds(self, mock_hook):
         mock_hook.return_value.list_builds.return_value = mock.MagicMock()
+
         operator = CloudBuildListBuildsOperator(task_id="id", location="global")
-        context = mock.MagicMock()
-        operator.execute(context=context)
+        operator.execute(context=mock.MagicMock())
+
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
         mock_hook.return_value.list_builds.assert_called_once_with(
             project_id=None,
@@ -203,23 +252,25 @@ class TestCloudBuildOperator(TestCase):
             metadata=(),
         )
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_retry_build(self, mock_hook):
         mock_hook.return_value.retry_build.return_value = Build()
+
         operator = CloudBuildRetryBuildOperator(id_=BUILD_ID, task_id="id")
-        context = mock.MagicMock()
-        operator.execute(context=context)
+        operator.execute(context=mock.MagicMock())
+
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
         mock_hook.return_value.retry_build.assert_called_once_with(
             id_=BUILD_ID, project_id=None, wait=True, retry=DEFAULT, timeout=None, metadata=()
         )
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_run_build_trigger(self, mock_hook):
         mock_hook.return_value.run_build_trigger.return_value = Build()
+
         operator = CloudBuildRunBuildTriggerOperator(trigger_id=TRIGGER_ID, source=REPO_SOURCE, task_id="id")
-        context = mock.MagicMock()
-        operator.execute(context=context)
+        operator.execute(context=mock.MagicMock())
+
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
         mock_hook.return_value.run_build_trigger.assert_called_once_with(
             trigger_id=TRIGGER_ID,
@@ -231,14 +282,15 @@ class TestCloudBuildOperator(TestCase):
             metadata=(),
         )
 
-    @mock.patch("airflow.providers.google.cloud.operators.cloud_build.CloudBuildHook")
+    @mock.patch(CLOUD_BUILD_HOOK_PATH)
     def test_update_build_trigger(self, mock_hook):
         mock_hook.return_value.update_build_trigger.return_value = BuildTrigger()
+
         operator = CloudBuildUpdateBuildTriggerOperator(
             trigger_id=TRIGGER_ID, trigger=BUILD_TRIGGER, task_id="id"
         )
-        context = mock.MagicMock()
-        operator.execute(context=context)
+        operator.execute(context=mock.MagicMock())
+
         mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None)
         mock_hook.return_value.update_build_trigger.assert_called_once_with(
             trigger_id=TRIGGER_ID,
@@ -324,3 +376,136 @@ class TestBuildProcessor(TestCase):
 
         BuildProcessor(build=body).process_body()
         assert body == expected_body
+
+
+@mock.patch(CLOUD_BUILD_HOOK_PATH)
+def test_async_create_build_fires_correct_trigger_should_execute_successfully(mock_hook):
+    mock_hook.return_value.create_build_without_waiting_for_result.return_value = (BUILD, BUILD_ID)
+
+    operator = CloudBuildCreateBuildOperator(
+        build=BUILD,
+        task_id="id",
+        deferrable=True,
+    )
+
+    with pytest.raises(TaskDeferred) as exc:
+        operator.execute(create_context(operator))
+
+    assert isinstance(
+        exc.value.trigger, CloudBuildCreateBuildTrigger
+    ), "Trigger is not a CloudBuildCreateBuildTrigger"
+
+
+@mock.patch(CLOUD_BUILD_HOOK_PATH)
+def test_async_create_build_without_wait_should_execute_successfully(mock_hook):
+    mock_hook.return_value.create_build_without_waiting_for_result.return_value = (BUILD, BUILD_ID)
+    mock_hook.return_value.get_build.return_value = Build()
+
+    operator = CloudBuildCreateBuildOperator(
+        build=BUILD,
+        task_id="id",
+        wait=False,
+        deferrable=True,
+    )
+    operator.execute(context=mock.MagicMock())
+
+    mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=None, delegate_to=None)
+    build = Build(BUILD)
+    mock_hook.return_value.create_build_without_waiting_for_result.assert_called_once_with(
+        build=build, project_id=None, retry=DEFAULT, timeout=None, metadata=()
+    )
+    mock_hook.return_value.get_build.assert_called_once_with(id_=BUILD_ID, project_id=None)
+
+
+@mock.patch(CLOUD_BUILD_HOOK_PATH)
+def test_async_create_build_correct_logging_should_execute_successfully(mock_hook):
+    mock_hook.return_value.create_build_without_waiting_for_result.return_value = (BUILD, BUILD_ID)
+    mock_hook.return_value.get_build.return_value = Build()
+
+    operator = CloudBuildCreateBuildOperator(
+        build=BUILD,
+        task_id="id",
+        deferrable=True,
+    )
+    with mock.patch.object(operator.log, "info") as mock_log_info:
+        operator.execute_complete(
+            context=create_context(operator),
+            event={
+                "instance": TEST_BUILD_INSTANCE,
+                "status": "success",
+                "message": "Build completed",
+                "id_": BUILD_ID,
+            },
+        )
+    mock_log_info.assert_called_with("Cloud Build completed with response %s ", "Build completed")
+
+
+def test_async_create_build_error_event_should_throw_exception():
+    operator = CloudBuildCreateBuildOperator(
+        build=BUILD,
+        task_id="id",
+        deferrable=True,
+    )
+    with pytest.raises(AirflowException):
+        operator.execute_complete(context=None, event={"status": "error", "message": "test failure message"})
+
+
+@mock.patch(CLOUD_BUILD_HOOK_PATH)
+def test_async_create_build_with_missing_build_should_throw_exception(mock_hook):
+    mock_hook.return_value.create_build.return_value = Build()
+    with pytest.raises(AirflowException, match="missing keyword argument 'build'"):
+        CloudBuildCreateBuildOperator(task_id="id")
+
+
+@parameterized.expand(
+    [
+        (
+            ".json",
+            json.dumps({"steps": [{"name": "ubuntu", "args": ["echo", "Hello {{ params.name }}!"]}]}),
+        ),
+        (
+            ".yaml",
+            """
+            steps:
+            - name: 'ubuntu'
+              args: ['echo', 'Hello {{ params.name }}!']
+            """,
+        ),
+    ]
+)
+def test_async_load_templated_should_execute_successfully(file_type, file_content):
+    with tempfile.NamedTemporaryFile(suffix=file_type, mode="w+") as f:
+        f.writelines(file_content)
+        f.flush()
+
+        operator = CloudBuildCreateBuildOperator(
+            build=f.name,
+            task_id="task-id",
+            params={"name": "airflow"},
+            deferrable=True,
+        )
+        operator.prepare_template()
+        expected_body = {"steps": [{"name": "ubuntu", "args": ["echo", "Hello {{ params.name }}!"]}]}
+        assert expected_body == operator.build
+
+
+def create_context(task):
+    dag = DAG(dag_id="dag")
+    logical_date = datetime(2022, 1, 1, 0, 0, 0)
+    dag_run = DagRun(
+        dag_id=dag.dag_id,
+        execution_date=logical_date,
+        run_id=DagRun.generate_run_id(DagRunType.MANUAL, logical_date),
+    )
+    task_instance = TaskInstance(task=task)
+    task_instance.dag_run = dag_run
+    task_instance.dag_id = dag.dag_id
+    task_instance.xcom_push = mock.Mock()
+    return {
+        "dag": dag,
+        "run_id": dag_run.run_id,
+        "task": task,
+        "ti": task_instance,
+        "task_instance": task_instance,
+        "logical_date": logical_date,
+    }
diff --git a/tests/providers/google/cloud/triggers/test_cloud_build.py b/tests/providers/google/cloud/triggers/test_cloud_build.py
new file mode 100644
index 0000000000..62203ddacb
--- /dev/null
+++ b/tests/providers/google/cloud/triggers/test_cloud_build.py
@@ -0,0 +1,240 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import logging
+import sys
+
+import pytest
+from google.cloud.devtools.cloudbuild_v1 import CloudBuildAsyncClient
+from google.cloud.devtools.cloudbuild_v1.types import Build, BuildStep
+
+from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildAsyncHook
+from airflow.providers.google.cloud.triggers.cloud_build import CloudBuildCreateBuildTrigger
+from airflow.triggers.base import TriggerEvent
+
+if sys.version_info < (3, 8):
+    from asynctest import mock
+else:
+    from unittest import mock
+
+CLOUD_BUILD_PATH = "airflow.providers.google.cloud.hooks.cloud_build.{}"
+TEST_PROJECT_ID = "cloud-build-project"
+TEST_BUILD_ID = "test-build-id-9832662"
+REPO_SOURCE = {"repo_source": {"repo_name": "test_repo", "branch_name": "main"}}
+TEST_BUILD = {
+    "source": REPO_SOURCE,
+    "steps": [{"name": "gcr.io/cloud-builders/gcloud", "entrypoint": "/bin/sh", "args": ["-c", "ls"]}],
+    "status": "SUCCESS",
+}
+TEST_BUILD_WORKING = {
+    "source": REPO_SOURCE,
+    "steps": [{"name": "gcr.io/cloud-builders/gcloud", "entrypoint": "/bin/sh", "args": ["-c", "ls"]}],
+    "status": "WORKING",
+}
+
+TEST_CONN_ID = "google_cloud_default"
+TEST_POLL_INTERVAL = 4.0
+TEST_BUILD_INSTANCE = dict(
+    id="test-build-id-9832662",
+    status=3,
+    steps=[
+        {
+            "name": "ubuntu",
+            "env": [],
+            "args": [],
+            "dir_": "",
+            "id": "",
+            "wait_for": [],
+            "entrypoint": "",
+            "secret_env": [],
+            "volumes": [],
+            "status": 0,
+            "script": "",
+        }
+    ],
+    name="",
+    project_id="",
+    status_detail="",
+    images=[],
+    logs_bucket="",
+    build_trigger_id="",
+    log_url="",
+    substitutions={},
+    tags=[],
+    secrets=[],
+    timing={},
+    service_account="",
+    warnings=[],
+)
+
+pytest.hook = CloudBuildAsyncHook(gcp_conn_id="google_cloud_default")
+
+
+@pytest.fixture
+def hook():
+    return CloudBuildAsyncHook(
+        gcp_conn_id="google_cloud_default",
+    )
+
+
+def test_async_create_build_trigger_serialization_should_execute_successfully():
+    """
+    Asserts that the CloudBuildCreateBuildTrigger correctly serializes its arguments
+    and classpath.
+    """
+    trigger = CloudBuildCreateBuildTrigger(
+        id_=TEST_BUILD_ID,
+        project_id=TEST_PROJECT_ID,
+        gcp_conn_id=TEST_CONN_ID,
+        impersonation_chain=None,
+        delegate_to=None,
+        poll_interval=TEST_POLL_INTERVAL,
+    )
+    classpath, kwargs = trigger.serialize()
+    assert classpath == "airflow.providers.google.cloud.triggers.cloud_build.CloudBuildCreateBuildTrigger"
+    assert kwargs == {
+        "id_": TEST_BUILD_ID,
+        "project_id": TEST_PROJECT_ID,
+        "gcp_conn_id": TEST_CONN_ID,
+        "impersonation_chain": None,
+        "delegate_to": None,
+        "poll_interval": TEST_POLL_INTERVAL,
+    }
+
+
+@pytest.mark.asyncio
+@mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self: None)
+@mock.patch(CLOUD_BUILD_PATH.format("CloudBuildAsyncClient.get_build"))
+async def test_async_create_build_trigger_triggers_on_success_should_execute_successfully(
+    mock_get_build, hook
+):
+    """
+    Tests the CloudBuildCreateBuildTrigger only fires once the job execution reaches a successful state.
+    """
+    mock_get_build.return_value = Build(
+        id=TEST_BUILD_ID, status=Build.Status.SUCCESS, steps=[BuildStep(name="ubuntu")]
+    )
+
+    trigger = CloudBuildCreateBuildTrigger(
+        id_=TEST_BUILD_ID,
+        project_id=TEST_PROJECT_ID,
+        gcp_conn_id=TEST_CONN_ID,
+        impersonation_chain=None,
+        delegate_to=None,
+        poll_interval=TEST_POLL_INTERVAL,
+    )
+
+    generator = trigger.run()
+    actual = await generator.asend(None)
+    assert (
+        TriggerEvent(
+            {
+                "instance": TEST_BUILD_INSTANCE,
+                "id_": TEST_BUILD_ID,
+                "status": "success",
+                "message": "Build completed",
+            }
+        )
+        == actual
+    )
+
+
+@pytest.mark.asyncio
+@mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self: None)
+@mock.patch(CLOUD_BUILD_PATH.format("CloudBuildAsyncClient.get_build"))
+async def test_async_create_build_trigger_triggers_on_running_should_execute_successfully(
+    mock_get_build, hook, caplog
+):
+    """
+    Test that CloudBuildCreateBuildTrigger does not fire while a build is still running.
+    """
+    mock_get_build.return_value = Build(
+        id=TEST_BUILD_ID, status=Build.Status.WORKING, steps=[BuildStep(name="ubuntu")]
+    )
+    caplog.set_level(logging.INFO)
+
+    trigger = CloudBuildCreateBuildTrigger(
+        id_=TEST_BUILD_ID,
+        project_id=TEST_PROJECT_ID,
+        gcp_conn_id=TEST_CONN_ID,
+        impersonation_chain=None,
+        delegate_to=None,
+        poll_interval=TEST_POLL_INTERVAL,
+    )
+    task = asyncio.create_task(trigger.run().__anext__())
+    await asyncio.sleep(0.5)
+
+    # TriggerEvent was not returned
+    assert task.done() is False
+
+    assert "Build is still running..." in caplog.text
+    assert f"Sleeping for {TEST_POLL_INTERVAL} seconds." in caplog.text
+
+    # Prevents error when task is destroyed while in "pending" state
+    asyncio.get_event_loop().stop()
+
+
+@pytest.mark.asyncio
+@mock.patch.object(CloudBuildAsyncClient, "__init__", lambda self: None)
+@mock.patch(CLOUD_BUILD_PATH.format("CloudBuildAsyncClient.get_build"))
+async def test_async_create_build_trigger_triggers_on_error_should_execute_successfully(
+    mock_get_build, hook, caplog
+):
+    """
+    Test that CloudBuildCreateBuildTrigger fires the correct event in case of an error.
+    """
+    mock_get_build.return_value = Build(
+        id=TEST_BUILD_ID, status=Build.Status.FAILURE, steps=[BuildStep(name="ubuntu")], status_detail="error"
+    )
+    caplog.set_level(logging.INFO)
+
+    trigger = CloudBuildCreateBuildTrigger(
+        id_=TEST_BUILD_ID,
+        project_id=TEST_PROJECT_ID,
+        gcp_conn_id=TEST_CONN_ID,
+        impersonation_chain=None,
+        delegate_to=None,
+        poll_interval=TEST_POLL_INTERVAL,
+    )
+
+    generator = trigger.run()
+    actual = await generator.asend(None)
+    assert TriggerEvent({"status": "error", "message": "error"}) == actual
+
+
+@pytest.mark.asyncio
+@mock.patch(CLOUD_BUILD_PATH.format("CloudBuildAsyncHook.get_cloud_build"))
+async def test_async_create_build_trigger_triggers_on_excp_should_execute_successfully(mock_build_inst):
+    """
+    Test that CloudBuildCreateBuildTrigger fires the correct event in case of an error.
+    """
+    mock_build_inst.side_effect = Exception("Test exception")
+
+    trigger = CloudBuildCreateBuildTrigger(
+        id_=TEST_BUILD_ID,
+        project_id=TEST_PROJECT_ID,
+        gcp_conn_id=TEST_CONN_ID,
+        impersonation_chain=None,
+        delegate_to=None,
+        poll_interval=TEST_POLL_INTERVAL,
+    )
+
+    generator = trigger.run()
+    actual = await generator.asend(None)
+    assert TriggerEvent({"status": "error", "message": "Test exception"}) == actual
diff --git a/tests/system/providers/google/cloud/cloud_build/example_cloud_build.py b/tests/system/providers/google/cloud/cloud_build/example_cloud_build.py
index bb770e46a7..0a420b1635 100644
--- a/tests/system/providers/google/cloud/cloud_build/example_cloud_build.py
+++ b/tests/system/providers/google/cloud/cloud_build/example_cloud_build.py
@@ -57,7 +57,7 @@ DAG_ID = "example_gcp_cloud_build"
 
 BUCKET_NAME_SRC = f"bucket-src-{DAG_ID}-{ENV_ID}"
 
-GCP_SOURCE_ARCHIVE_URL = os.environ.get("GCP_CLOUD_BUILD_ARCHIVE_URL", f"gs://{BUCKET_NAME_SRC}/file.tar.gz")
+GCP_SOURCE_ARCHIVE_URL = f"gs://{BUCKET_NAME_SRC}/file.tar.gz"
 GCP_SOURCE_REPOSITORY_NAME = "test-cloud-build-repo"
 
 GCP_SOURCE_ARCHIVE_URL_PARTS = urlsplit(GCP_SOURCE_ARCHIVE_URL)
@@ -100,7 +100,9 @@ with models.DAG(
 
     # [START howto_operator_create_build_from_storage]
     create_build_from_storage = CloudBuildCreateBuildOperator(
-        task_id="create_build_from_storage", project_id=PROJECT_ID, build=create_build_from_storage_body
+        task_id="create_build_from_storage",
+        project_id=PROJECT_ID,
+        build=create_build_from_storage_body,
     )
     # [END howto_operator_create_build_from_storage]
 
@@ -113,7 +115,9 @@ with models.DAG(
 
     # [START howto_operator_create_build_from_repo]
     create_build_from_repo = CloudBuildCreateBuildOperator(
-        task_id="create_build_from_repo", project_id=PROJECT_ID, build=create_build_from_repo_body
+        task_id="create_build_from_repo",
+        project_id=PROJECT_ID,
+        build=create_build_from_repo_body,
     )
     # [END howto_operator_create_build_from_repo]
 
@@ -126,7 +130,9 @@ with models.DAG(
 
     # [START howto_operator_list_builds]
     list_builds = CloudBuildListBuildsOperator(
-        task_id="list_builds", project_id=PROJECT_ID, location="global"
+        task_id="list_builds",
+        project_id=PROJECT_ID,
+        location="global",
     )
     # [END howto_operator_list_builds]
 
@@ -173,7 +179,9 @@ with models.DAG(
     # [END howto_operator_gcp_create_build_from_yaml_body]
 
     delete_bucket_src = GCSDeleteBucketOperator(
-        task_id="delete_bucket_src", bucket_name=BUCKET_NAME_SRC, trigger_rule=TriggerRule.ALL_DONE
+        task_id="delete_bucket_src",
+        bucket_name=BUCKET_NAME_SRC,
+        trigger_rule=TriggerRule.ALL_DONE,
     )
 
     chain(
diff --git a/tests/system/providers/google/cloud/cloud_build/example_cloud_build.py b/tests/system/providers/google/cloud/cloud_build/example_cloud_build_async.py
similarity index 83%
copy from tests/system/providers/google/cloud/cloud_build/example_cloud_build.py
copy to tests/system/providers/google/cloud/cloud_build/example_cloud_build_async.py
index bb770e46a7..c652bd4aef 100644
--- a/tests/system/providers/google/cloud/cloud_build/example_cloud_build.py
+++ b/tests/system/providers/google/cloud/cloud_build/example_cloud_build_async.py
@@ -33,7 +33,7 @@ from pathlib import Path
 from typing import Any, cast
 
 import yaml
-from future.backports.urllib.parse import urlsplit
+from future.backports.urllib.parse import urlparse
 
 from airflow import models
 from airflow.models.baseoperator import chain
@@ -53,14 +53,14 @@ from airflow.utils.trigger_rule import TriggerRule
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
 
-DAG_ID = "example_gcp_cloud_build"
+DAG_ID = "example_gcp_cloud_build_async"
 
 BUCKET_NAME_SRC = f"bucket-src-{DAG_ID}-{ENV_ID}"
 
-GCP_SOURCE_ARCHIVE_URL = os.environ.get("GCP_CLOUD_BUILD_ARCHIVE_URL", f"gs://{BUCKET_NAME_SRC}/file.tar.gz")
+GCP_SOURCE_ARCHIVE_URL = f"gs://{BUCKET_NAME_SRC}/file.tar.gz"
 GCP_SOURCE_REPOSITORY_NAME = "test-cloud-build-repo"
 
-GCP_SOURCE_ARCHIVE_URL_PARTS = urlsplit(GCP_SOURCE_ARCHIVE_URL)
+GCP_SOURCE_ARCHIVE_URL_PARTS = urlparse(GCP_SOURCE_ARCHIVE_URL)
 GCP_SOURCE_BUCKET_NAME = GCP_SOURCE_ARCHIVE_URL_PARTS.netloc
 
 CURRENT_FOLDER = Path(__file__).parent
@@ -98,11 +98,14 @@ with models.DAG(
         bucket=BUCKET_NAME_SRC,
     )
 
-    # [START howto_operator_create_build_from_storage]
+    # [START howto_operator_create_build_from_storage_async]
     create_build_from_storage = CloudBuildCreateBuildOperator(
-        task_id="create_build_from_storage", project_id=PROJECT_ID, build=create_build_from_storage_body
+        task_id="create_build_from_storage",
+        project_id=PROJECT_ID,
+        build=create_build_from_storage_body,
+        deferrable=True,
     )
-    # [END howto_operator_create_build_from_storage]
+    # [END howto_operator_create_build_from_storage_async]
 
     # [START howto_operator_create_build_from_storage_result]
     create_build_from_storage_result = BashOperator(
@@ -111,11 +114,14 @@ with models.DAG(
     )
     # [END howto_operator_create_build_from_storage_result]
 
-    # [START howto_operator_create_build_from_repo]
+    # [START howto_operator_create_build_from_repo_async]
     create_build_from_repo = CloudBuildCreateBuildOperator(
-        task_id="create_build_from_repo", project_id=PROJECT_ID, build=create_build_from_repo_body
+        task_id="create_build_from_repo",
+        project_id=PROJECT_ID,
+        build=create_build_from_repo_body,
+        deferrable=True,
     )
-    # [END howto_operator_create_build_from_repo]
+    # [END howto_operator_create_build_from_repo_async]
 
     # [START howto_operator_create_build_from_repo_result]
     create_build_from_repo_result = BashOperator(
@@ -126,18 +132,21 @@ with models.DAG(
 
     # [START howto_operator_list_builds]
     list_builds = CloudBuildListBuildsOperator(
-        task_id="list_builds", project_id=PROJECT_ID, location="global"
+        task_id="list_builds",
+        project_id=PROJECT_ID,
+        location="global",
     )
     # [END howto_operator_list_builds]
 
-    # [START howto_operator_create_build_without_wait]
+    # [START howto_operator_create_build_without_wait_async]
     create_build_without_wait = CloudBuildCreateBuildOperator(
         task_id="create_build_without_wait",
         project_id=PROJECT_ID,
         build=create_build_from_repo_body,
         wait=False,
+        deferrable=True,
     )
-    # [END howto_operator_create_build_without_wait]
+    # [END howto_operator_create_build_without_wait_async]
 
     # [START howto_operator_cancel_build]
     cancel_build = CloudBuildCancelBuildOperator(
@@ -163,17 +172,20 @@ with models.DAG(
     )
     # [END howto_operator_get_build]
 
-    # [START howto_operator_gcp_create_build_from_yaml_body]
+    # [START howto_operator_gcp_create_build_from_yaml_body_async]
     create_build_from_file = CloudBuildCreateBuildOperator(
         task_id="create_build_from_file",
         project_id=PROJECT_ID,
         build=yaml.safe_load((Path(CURRENT_FOLDER) / "resources" / "example_cloud_build.yaml").read_text()),
         params={"name": "Airflow"},
+        deferrable=True,
     )
-    # [END howto_operator_gcp_create_build_from_yaml_body]
+    # [END howto_operator_gcp_create_build_from_yaml_body_async]
 
     delete_bucket_src = GCSDeleteBucketOperator(
-        task_id="delete_bucket_src", bucket_name=BUCKET_NAME_SRC, trigger_rule=TriggerRule.ALL_DONE
+        task_id="delete_bucket_src",
+        bucket_name=BUCKET_NAME_SRC,
+        trigger_rule=TriggerRule.ALL_DONE,
     )
 
     chain(
diff --git a/tests/system/providers/google/cloud/cloud_build/example_cloud_build_trigger.py b/tests/system/providers/google/cloud/cloud_build/example_cloud_build_trigger.py
index 91e48415d6..5b909dd966 100644
--- a/tests/system/providers/google/cloud/cloud_build/example_cloud_build_trigger.py
+++ b/tests/system/providers/google/cloud/cloud_build/example_cloud_build_trigger.py
@@ -37,9 +37,11 @@ from airflow.providers.google.cloud.operators.cloud_build import (
     CloudBuildRunBuildTriggerOperator,
     CloudBuildUpdateBuildTriggerOperator,
 )
+from airflow.utils.trigger_rule import TriggerRule
 
 ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
 PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+TRIGGER_NAME = f"cloud-build-trigger-{ENV_ID}"
 
 DAG_ID = "example_gcp_cloud_build_trigger"
 
@@ -47,7 +49,7 @@ GCP_SOURCE_REPOSITORY_NAME = "test-cloud-build-repo"
 
 # [START howto_operator_gcp_create_build_trigger_body]
 create_build_trigger_body = {
-    "name": f"test-cloud-build-trigger-{ENV_ID}",
+    "name": TRIGGER_NAME,
     "trigger_template": {
         "project_id": PROJECT_ID,
         "repo_name": GCP_SOURCE_REPOSITORY_NAME,
@@ -59,7 +61,7 @@ create_build_trigger_body = {
 
 # [START howto_operator_gcp_update_build_trigger_body]
 update_build_trigger_body = {
-    "name": f"test-cloud-build-trigger-{ENV_ID}",
+    "name": TRIGGER_NAME,
     "trigger_template": {
         "project_id": PROJECT_ID,
         "repo_name": GCP_SOURCE_REPOSITORY_NAME,
@@ -126,10 +128,14 @@ with models.DAG(
         trigger_id=build_trigger_id,
     )
     # [END howto_operator_delete_build_trigger]
+    delete_build_trigger.trigger_rule = TriggerRule.ALL_DONE
 
     # [START howto_operator_list_build_triggers]
     list_build_triggers = CloudBuildListBuildTriggersOperator(
-        task_id="list_build_triggers", project_id=PROJECT_ID, location="global", page_size=5
+        task_id="list_build_triggers",
+        project_id=PROJECT_ID,
+        location="global",
+        page_size=5,
     )
     # [END howto_operator_list_build_triggers]