You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2023/02/23 15:27:44 UTC

[airflow] branch main updated: Add `DbtCloudJobRunAsyncSensor` (#29695)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e6d3176082 Add `DbtCloudJobRunAsyncSensor` (#29695)
e6d3176082 is described below

commit e6d317608251d2725627ac2da0e60d5c5b206c1e
Author: Phani Kumar <94...@users.noreply.github.com>
AuthorDate: Thu Feb 23 20:57:34 2023 +0530

    Add `DbtCloudJobRunAsyncSensor` (#29695)
    
    This PR donates `DbtCloudJobRunAsyncSensor` from [astronomer-providers ](https://github.com/astronomer/astronomer-providers)repo to OSS Airflow
---
 airflow/providers/dbt/cloud/sensors/dbt.py         |  61 ++++-
 .../operators.rst                                  |  16 ++
 .../providers/dbt/cloud/sensors/test_dbt_cloud.py  |  61 ++++-
 tests/providers/dbt/cloud/triggers/__init__.py     |  16 ++
 .../providers/dbt/cloud/triggers/test_dbt_cloud.py | 304 +++++++++++++++++++++
 .../providers/dbt/cloud/example_dbt_cloud.py       |   9 +-
 6 files changed, 463 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/dbt/cloud/sensors/dbt.py b/airflow/providers/dbt/cloud/sensors/dbt.py
index 14df6910c9..568c0300b0 100644
--- a/airflow/providers/dbt/cloud/sensors/dbt.py
+++ b/airflow/providers/dbt/cloud/sensors/dbt.py
@@ -16,9 +16,12 @@
 # under the License.
 from __future__ import annotations
 
-from typing import TYPE_CHECKING
+import time
+from typing import TYPE_CHECKING, Any
 
+from airflow import AirflowException
 from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus
+from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
 from airflow.sensors.base import BaseSensorOperator
 
 if TYPE_CHECKING:
@@ -64,3 +67,59 @@ class DbtCloudJobRunSensor(BaseSensorOperator):
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on the DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param poll_interval: Periodic time interval for the sensor to check for job status.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,
+        timeout: float = 60 * 60 * 24 * 7,
+        **kwargs: Any,
+    ):
+        self.poll_interval = poll_interval
+        self.timeout = timeout
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """
+        Defers to Trigger class to poll for state of the job run until
+        it reaches a failure state or success state
+        """
+        end_time = time.time() + self.timeout
+        self.defer(
+            timeout=self.execution_timeout,
+            trigger=DbtCloudRunJobTrigger(
+                run_id=self.run_id,
+                conn_id=self.dbt_cloud_conn_id,
+                account_id=self.account_id,
+                poll_interval=self.poll_interval,
+                end_time=end_time,
+            ),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes execution was
+        successful.
+        """
+        if event["status"] in ["error", "cancelled"]:
+            raise AirflowException("Error in dbt: " + event["message"])
+        self.log.info(event["message"])
+        return int(event["run_id"])
diff --git a/docs/apache-airflow-providers-dbt-cloud/operators.rst b/docs/apache-airflow-providers-dbt-cloud/operators.rst
index 1f7b27b280..f936e15a1c 100644
--- a/docs/apache-airflow-providers-dbt-cloud/operators.rst
+++ b/docs/apache-airflow-providers-dbt-cloud/operators.rst
@@ -98,6 +98,22 @@ the ``account_id`` for the task is referenced within the ``default_args`` of the
     :start-after: [START howto_operator_dbt_cloud_run_job_sensor]
     :end-before: [END howto_operator_dbt_cloud_run_job_sensor]
 
+.. _howto/operator:DbtCloudJobRunAsyncSensor:
+
+Poll for status of a dbt Cloud Job run asynchronously
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Use the :class:`~airflow.providers.dbt.cloud.sensors.dbt.DbtCloudJobRunAsyncSensor`
+(deferrable version) to periodically retrieve the
+status of a dbt Cloud job run asynchronously. This sensor will free up the worker slots since
+polling for job status happens on the Airflow triggerer, leading to efficient utilization
+of resources within Airflow.
+
+.. exampleinclude:: /../../tests/system/providers/dbt/cloud/example_dbt_cloud.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_dbt_cloud_run_job_async_sensor]
+    :end-before: [END howto_operator_dbt_cloud_run_job_async_sensor]
 
 .. _howto/operator:DbtCloudGetJobRunArtifactOperator:
 
diff --git a/tests/providers/dbt/cloud/sensors/test_dbt_cloud.py b/tests/providers/dbt/cloud/sensors/test_dbt_cloud.py
index 55393c8538..eab6317a44 100644
--- a/tests/providers/dbt/cloud/sensors/test_dbt_cloud.py
+++ b/tests/providers/dbt/cloud/sensors/test_dbt_cloud.py
@@ -16,13 +16,17 @@
 # under the License.
 from __future__ import annotations
 
+from unittest import mock
 from unittest.mock import patch
 
 import pytest
 
+from airflow import AirflowException
+from airflow.exceptions import TaskDeferred
 from airflow.models.connection import Connection
 from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus
-from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
+from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunAsyncSensor, DbtCloudJobRunSensor
+from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
 from airflow.utils import db
 
 ACCOUNT_ID = 11111
@@ -78,3 +82,58 @@ class TestDbtCloudJobRunSensor:
 
             with pytest.raises(DbtCloudJobRunException, match=error_message):
                 self.sensor.poke({})
+
+
+class TestDbtCloudJobRunSensorAsync:
+    TASK_ID = "dbt_cloud_run_job"
+    CONN_ID = "dbt_cloud_default"
+    DBT_RUN_ID = 1234
+    TIMEOUT = 300
+
+    def test_dbt_job_run_sensor_async(self):
+        """Assert execute method defer for Dbt cloud job run status sensors"""
+        task = DbtCloudJobRunAsyncSensor(
+            dbt_cloud_conn_id=self.CONN_ID,
+            task_id=self.TASK_ID,
+            run_id=self.DBT_RUN_ID,
+            timeout=self.TIMEOUT,
+        )
+        with pytest.raises(TaskDeferred) as exc:
+            task.execute({})
+        assert isinstance(exc.value.trigger, DbtCloudRunJobTrigger), "Trigger is not a DbtCloudRunJobTrigger"
+
+    def test_dbt_job_run_sensor_async_execute_complete_success(self):
+        """Assert execute_complete log success message when trigger fire with target status"""
+        task = DbtCloudJobRunAsyncSensor(
+            dbt_cloud_conn_id=self.CONN_ID,
+            task_id=self.TASK_ID,
+            run_id=self.DBT_RUN_ID,
+            timeout=self.TIMEOUT,
+        )
+
+        msg = f"Job run {self.DBT_RUN_ID} has completed successfully."
+        with mock.patch.object(task.log, "info") as mock_log_info:
+            task.execute_complete(
+                context={}, event={"status": "success", "message": msg, "run_id": self.DBT_RUN_ID}
+            )
+        mock_log_info.assert_called_with(msg)
+
+    @pytest.mark.parametrize(
+        "mock_status, mock_message",
+        [
+            ("cancelled", "Job run 1234 has been cancelled."),
+            ("error", "Job run 1234 has failed."),
+        ],
+    )
+    def test_dbt_job_run_sensor_async_execute_complete_failure(self, mock_status, mock_message):
+        """Assert execute_complete method to raise exception on the cancelled and error status"""
+        task = DbtCloudJobRunAsyncSensor(
+            dbt_cloud_conn_id=self.CONN_ID,
+            task_id=self.TASK_ID,
+            run_id=self.DBT_RUN_ID,
+            timeout=self.TIMEOUT,
+        )
+        with pytest.raises(AirflowException):
+            task.execute_complete(
+                context={}, event={"status": mock_status, "message": mock_message, "run_id": self.DBT_RUN_ID}
+            )
diff --git a/tests/providers/dbt/cloud/triggers/__init__.py b/tests/providers/dbt/cloud/triggers/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/dbt/cloud/triggers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/dbt/cloud/triggers/test_dbt_cloud.py b/tests/providers/dbt/cloud/triggers/test_dbt_cloud.py
new file mode 100644
index 0000000000..1436112d22
--- /dev/null
+++ b/tests/providers/dbt/cloud/triggers/test_dbt_cloud.py
@@ -0,0 +1,304 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import sys
+import time
+
+import pytest
+
+from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunStatus
+from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
+from airflow.triggers.base import TriggerEvent
+
+if sys.version_info < (3, 8):
+    # For compatibility with Python 3.7
+    from asynctest import mock as async_mock
+    from asynctest.mock import CoroutineMock as AsyncMock
+else:
+    from unittest import mock as async_mock
+    from unittest.mock import AsyncMock
+
+
+class TestDbtCloudRunJobTrigger:
+    DAG_ID = "dbt_cloud_run"
+    TASK_ID = "dbt_cloud_run_task_op"
+    RUN_ID = 1234
+    CONN_ID = "dbt_cloud_default"
+    ACCOUNT_ID = 12340
+    END_TIME = time.time() + 60 * 60 * 24 * 7
+    POLL_INTERVAL = 3.0
+
+    def test_serialization(self):
+        """Assert DbtCloudRunJobTrigger correctly serializes its arguments and classpath."""
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=self.END_TIME,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+        classpath, kwargs = trigger.serialize()
+        assert classpath == "airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger"
+        assert kwargs == {
+            "run_id": self.RUN_ID,
+            "account_id": self.ACCOUNT_ID,
+            "conn_id": self.CONN_ID,
+            "end_time": self.END_TIME,
+            "poll_interval": self.POLL_INTERVAL,
+        }
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+    async def test_dbt_run_job_trigger(self, mocked_is_still_running):
+        """Test DbtCloudRunJobTrigger is triggered with mocked details and run successfully."""
+        mocked_is_still_running.return_value = True
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=self.END_TIME,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+        task = asyncio.create_task(trigger.run().__anext__())
+        await asyncio.sleep(0.5)
+
+        # TriggerEvent was not returned
+        assert task.done() is False
+        asyncio.get_event_loop().stop()
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "mock_value, mock_status, mock_message",
+        [
+            (DbtCloudJobRunStatus.SUCCESS.value, "success", "Job run 1234 has completed successfully."),
+        ],
+    )
+    @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+    @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+    async def test_dbt_job_run_for_terminal_status_success(
+        self, mock_get_job_status, mocked_is_still_running, mock_value, mock_status, mock_message
+    ):
+        """Assert that run trigger success message in case of job success"""
+        mocked_is_still_running.return_value = False
+        mock_get_job_status.return_value = mock_value
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=self.END_TIME,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+        expected_result = {
+            "status": mock_status,
+            "message": mock_message,
+            "run_id": self.RUN_ID,
+        }
+        task = asyncio.create_task(trigger.run().__anext__())
+        await asyncio.sleep(0.5)
+        assert TriggerEvent(expected_result) == task.result()
+        asyncio.get_event_loop().stop()
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "mock_value, mock_status, mock_message",
+        [
+            (DbtCloudJobRunStatus.CANCELLED.value, "cancelled", "Job run 1234 has been cancelled."),
+        ],
+    )
+    @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+    @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+    async def test_dbt_job_run_for_terminal_status_cancelled(
+        self, mock_get_job_status, mocked_is_still_running, mock_value, mock_status, mock_message
+    ):
+        """Assert that run trigger success message in case of job success"""
+        mocked_is_still_running.return_value = False
+        mock_get_job_status.return_value = mock_value
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=self.END_TIME,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+        expected_result = {
+            "status": mock_status,
+            "message": mock_message,
+            "run_id": self.RUN_ID,
+        }
+        task = asyncio.create_task(trigger.run().__anext__())
+        await asyncio.sleep(0.5)
+        assert TriggerEvent(expected_result) == task.result()
+        asyncio.get_event_loop().stop()
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "mock_value, mock_status, mock_message",
+        [
+            (DbtCloudJobRunStatus.ERROR.value, "error", "Job run 1234 has failed."),
+        ],
+    )
+    @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+    @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+    async def test_dbt_job_run_for_terminal_status_error(
+        self, mock_get_job_status, mocked_is_still_running, mock_value, mock_status, mock_message
+    ):
+        """Assert that run trigger success message in case of job success"""
+        mocked_is_still_running.return_value = False
+        mock_get_job_status.return_value = mock_value
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=self.END_TIME,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+        expected_result = {
+            "status": mock_status,
+            "message": mock_message,
+            "run_id": self.RUN_ID,
+        }
+        task = asyncio.create_task(trigger.run().__anext__())
+        await asyncio.sleep(0.5)
+        assert TriggerEvent(expected_result) == task.result()
+        asyncio.get_event_loop().stop()
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+    @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+    async def test_dbt_job_run_exception(self, mock_get_job_status, mocked_is_still_running):
+        """Assert that run catch exception if dbt cloud job API throw exception"""
+        mocked_is_still_running.return_value = False
+        mock_get_job_status.side_effect = Exception("Test exception")
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=self.END_TIME,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+        task = [i async for i in trigger.run()]
+        response = TriggerEvent(
+            {
+                "status": "error",
+                "message": "Test exception",
+                "run_id": self.RUN_ID,
+            }
+        )
+        assert len(task) == 1
+        assert response in task
+
+    @pytest.mark.asyncio
+    @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+    @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+    async def test_dbt_job_run_timeout(self, mock_get_job_status, mocked_is_still_running):
+        """Assert that run timeout after end_time elapsed"""
+        mocked_is_still_running.return_value = True
+        mock_get_job_status.side_effect = Exception("Test exception")
+        end_time = time.time()
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=end_time,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        expected = TriggerEvent(
+            {
+                "status": "error",
+                "message": f"Job run {self.RUN_ID} has not reached a terminal status "
+                f"after {end_time} seconds.",
+                "run_id": self.RUN_ID,
+            }
+        )
+        assert expected == actual
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "mock_response, expected_status",
+        [
+            (DbtCloudJobRunStatus.SUCCESS.value, False),
+        ],
+    )
+    @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+    async def test_dbt_job_run_is_still_running_success(
+        self, mock_get_job_status, mock_response, expected_status
+    ):
+        """Test is_still_running with mocked response job status and assert
+        the return response with expected value"""
+        hook = AsyncMock(DbtCloudHook)
+        hook.get_job_status.return_value = mock_response
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=self.END_TIME,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+        response = await trigger.is_still_running(hook)
+        assert response == expected_status
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "mock_response, expected_status",
+        [
+            (DbtCloudJobRunStatus.RUNNING.value, True),
+        ],
+    )
+    @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+    async def test_dbt_job_run_is_still_running(self, mock_get_job_status, mock_response, expected_status):
+        """Test is_still_running with mocked response job status and assert
+        the return response with expected value"""
+        hook = AsyncMock(DbtCloudHook)
+        hook.get_job_status.return_value = mock_response
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=self.END_TIME,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+        response = await trigger.is_still_running(hook)
+        assert response == expected_status
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "mock_response, expected_status",
+        [
+            (DbtCloudJobRunStatus.QUEUED.value, True),
+        ],
+    )
+    @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+    async def test_dbt_job_run_is_still_running_queued(
+        self, mock_get_job_status, mock_response, expected_status
+    ):
+        """Test is_still_running with mocked response job status and assert
+        the return response with expected value"""
+        hook = AsyncMock(DbtCloudHook)
+        hook.get_job_status.return_value = mock_response
+        trigger = DbtCloudRunJobTrigger(
+            conn_id=self.CONN_ID,
+            poll_interval=self.POLL_INTERVAL,
+            end_time=self.END_TIME,
+            run_id=self.RUN_ID,
+            account_id=self.ACCOUNT_ID,
+        )
+        response = await trigger.is_still_running(hook)
+        assert response == expected_status
diff --git a/tests/system/providers/dbt/cloud/example_dbt_cloud.py b/tests/system/providers/dbt/cloud/example_dbt_cloud.py
index feb8b15678..b65104aef7 100644
--- a/tests/system/providers/dbt/cloud/example_dbt_cloud.py
+++ b/tests/system/providers/dbt/cloud/example_dbt_cloud.py
@@ -30,7 +30,7 @@ from airflow.providers.dbt.cloud.operators.dbt import (
     DbtCloudListJobsOperator,
     DbtCloudRunJobOperator,
 )
-from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
+from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunAsyncSensor, DbtCloudJobRunSensor
 from airflow.utils.edgemodifier import Label
 from tests.system.utils import get_test_env_id
 
@@ -77,6 +77,12 @@ with DAG(
     )
     # [END howto_operator_dbt_cloud_run_job_sensor]
 
+    # [START howto_operator_dbt_cloud_run_job_async_sensor]
+    job_run_async_sensor = DbtCloudJobRunAsyncSensor(
+        task_id="job_run_async_sensor", run_id=trigger_job_run2.output, timeout=20
+    )
+    # [END howto_operator_dbt_cloud_run_job_async_sensor]
+
     # [START howto_operator_dbt_cloud_list_jobs]
     list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)
     # [END howto_operator_dbt_cloud_list_jobs]
@@ -95,7 +101,6 @@ with DAG(
     # when "tearDown" task with trigger rule is part of the DAG
     list(dag.tasks) >> watcher()
 
-
 from tests.system.utils import get_test_run  # noqa: E402
 
 # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)