You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2023/02/23 15:27:44 UTC
[airflow] branch main updated: Add `DbtCloudJobRunAsyncSensor` (#29695)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e6d3176082 Add `DbtCloudJobRunAsyncSensor` (#29695)
e6d3176082 is described below
commit e6d317608251d2725627ac2da0e60d5c5b206c1e
Author: Phani Kumar <94...@users.noreply.github.com>
AuthorDate: Thu Feb 23 20:57:34 2023 +0530
Add `DbtCloudJobRunAsyncSensor` (#29695)
This PR donates `DbtCloudJobRunAsyncSensor` from [astronomer-providers ](https://github.com/astronomer/astronomer-providers)repo to OSS Airflow
---
airflow/providers/dbt/cloud/sensors/dbt.py | 61 ++++-
.../operators.rst | 16 ++
.../providers/dbt/cloud/sensors/test_dbt_cloud.py | 61 ++++-
tests/providers/dbt/cloud/triggers/__init__.py | 16 ++
.../providers/dbt/cloud/triggers/test_dbt_cloud.py | 304 +++++++++++++++++++++
.../providers/dbt/cloud/example_dbt_cloud.py | 9 +-
6 files changed, 463 insertions(+), 4 deletions(-)
diff --git a/airflow/providers/dbt/cloud/sensors/dbt.py b/airflow/providers/dbt/cloud/sensors/dbt.py
index 14df6910c9..568c0300b0 100644
--- a/airflow/providers/dbt/cloud/sensors/dbt.py
+++ b/airflow/providers/dbt/cloud/sensors/dbt.py
@@ -16,9 +16,12 @@
# under the License.
from __future__ import annotations
-from typing import TYPE_CHECKING
+import time
+from typing import TYPE_CHECKING, Any
+from airflow import AirflowException
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus
+from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from airflow.sensors.base import BaseSensorOperator
if TYPE_CHECKING:
@@ -64,3 +67,59 @@ class DbtCloudJobRunSensor(BaseSensorOperator):
raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+ """
+ Checks the status of a dbt Cloud job run asynchronously.
+
+ .. seealso::
+ For more information on the DbtCloudJobRunAsyncSensor, take a look at the guide::
+ :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+ :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+ :param run_id: The job run identifier.
+ :param account_id: The dbt Cloud account identifier.
+ :param poll_interval: Periodic time interval for the sensor to check for job status.
+ :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+ """
+
+ def __init__(
+ self,
+ *,
+ poll_interval: float = 5,
+ timeout: float = 60 * 60 * 24 * 7,
+ **kwargs: Any,
+ ):
+ self.poll_interval = poll_interval
+ self.timeout = timeout
+ super().__init__(**kwargs)
+
+ def execute(self, context: Context) -> None:
+ """
+ Defers to Trigger class to poll for state of the job run until
+ it reaches a failure state or success state
+ """
+ end_time = time.time() + self.timeout
+ self.defer(
+ timeout=self.execution_timeout,
+ trigger=DbtCloudRunJobTrigger(
+ run_id=self.run_id,
+ conn_id=self.dbt_cloud_conn_id,
+ account_id=self.account_id,
+ poll_interval=self.poll_interval,
+ end_time=end_time,
+ ),
+ method_name="execute_complete",
+ )
+
+ def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
+ """
+ Callback for when the trigger fires - returns immediately.
+ Relies on trigger to throw an exception, otherwise it assumes execution was
+ successful.
+ """
+ if event["status"] in ["error", "cancelled"]:
+ raise AirflowException("Error in dbt: " + event["message"])
+ self.log.info(event["message"])
+ return int(event["run_id"])
diff --git a/docs/apache-airflow-providers-dbt-cloud/operators.rst b/docs/apache-airflow-providers-dbt-cloud/operators.rst
index 1f7b27b280..f936e15a1c 100644
--- a/docs/apache-airflow-providers-dbt-cloud/operators.rst
+++ b/docs/apache-airflow-providers-dbt-cloud/operators.rst
@@ -98,6 +98,22 @@ the ``account_id`` for the task is referenced within the ``default_args`` of the
:start-after: [START howto_operator_dbt_cloud_run_job_sensor]
:end-before: [END howto_operator_dbt_cloud_run_job_sensor]
+.. _howto/operator:DbtCloudJobRunAsyncSensor:
+
+Poll for status of a dbt Cloud Job run asynchronously
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Use the :class:`~airflow.providers.dbt.cloud.sensors.dbt.DbtCloudJobRunAsyncSensor`
+(deferrable version) to periodically retrieve the
+status of a dbt Cloud job run asynchronously. This sensor will free up the worker slots since
+polling for job status happens on the Airflow triggerer, leading to efficient utilization
+of resources within Airflow.
+
+.. exampleinclude:: /../../tests/system/providers/dbt/cloud/example_dbt_cloud.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_dbt_cloud_run_job_async_sensor]
+ :end-before: [END howto_operator_dbt_cloud_run_job_async_sensor]
.. _howto/operator:DbtCloudGetJobRunArtifactOperator:
diff --git a/tests/providers/dbt/cloud/sensors/test_dbt_cloud.py b/tests/providers/dbt/cloud/sensors/test_dbt_cloud.py
index 55393c8538..eab6317a44 100644
--- a/tests/providers/dbt/cloud/sensors/test_dbt_cloud.py
+++ b/tests/providers/dbt/cloud/sensors/test_dbt_cloud.py
@@ -16,13 +16,17 @@
# under the License.
from __future__ import annotations
+from unittest import mock
from unittest.mock import patch
import pytest
+from airflow import AirflowException
+from airflow.exceptions import TaskDeferred
from airflow.models.connection import Connection
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus
-from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
+from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunAsyncSensor, DbtCloudJobRunSensor
+from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from airflow.utils import db
ACCOUNT_ID = 11111
@@ -78,3 +82,58 @@ class TestDbtCloudJobRunSensor:
with pytest.raises(DbtCloudJobRunException, match=error_message):
self.sensor.poke({})
+
+
+class TestDbtCloudJobRunSensorAsync:
+ TASK_ID = "dbt_cloud_run_job"
+ CONN_ID = "dbt_cloud_default"
+ DBT_RUN_ID = 1234
+ TIMEOUT = 300
+
+ def test_dbt_job_run_sensor_async(self):
+ """Assert execute method defer for Dbt cloud job run status sensors"""
+ task = DbtCloudJobRunAsyncSensor(
+ dbt_cloud_conn_id=self.CONN_ID,
+ task_id=self.TASK_ID,
+ run_id=self.DBT_RUN_ID,
+ timeout=self.TIMEOUT,
+ )
+ with pytest.raises(TaskDeferred) as exc:
+ task.execute({})
+ assert isinstance(exc.value.trigger, DbtCloudRunJobTrigger), "Trigger is not a DbtCloudRunJobTrigger"
+
+ def test_dbt_job_run_sensor_async_execute_complete_success(self):
+ """Assert execute_complete log success message when trigger fire with target status"""
+ task = DbtCloudJobRunAsyncSensor(
+ dbt_cloud_conn_id=self.CONN_ID,
+ task_id=self.TASK_ID,
+ run_id=self.DBT_RUN_ID,
+ timeout=self.TIMEOUT,
+ )
+
+ msg = f"Job run {self.DBT_RUN_ID} has completed successfully."
+ with mock.patch.object(task.log, "info") as mock_log_info:
+ task.execute_complete(
+ context={}, event={"status": "success", "message": msg, "run_id": self.DBT_RUN_ID}
+ )
+ mock_log_info.assert_called_with(msg)
+
+ @pytest.mark.parametrize(
+ "mock_status, mock_message",
+ [
+ ("cancelled", "Job run 1234 has been cancelled."),
+ ("error", "Job run 1234 has failed."),
+ ],
+ )
+ def test_dbt_job_run_sensor_async_execute_complete_failure(self, mock_status, mock_message):
+ """Assert execute_complete method to raise exception on the cancelled and error status"""
+ task = DbtCloudJobRunAsyncSensor(
+ dbt_cloud_conn_id=self.CONN_ID,
+ task_id=self.TASK_ID,
+ run_id=self.DBT_RUN_ID,
+ timeout=self.TIMEOUT,
+ )
+ with pytest.raises(AirflowException):
+ task.execute_complete(
+ context={}, event={"status": mock_status, "message": mock_message, "run_id": self.DBT_RUN_ID}
+ )
diff --git a/tests/providers/dbt/cloud/triggers/__init__.py b/tests/providers/dbt/cloud/triggers/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/dbt/cloud/triggers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/dbt/cloud/triggers/test_dbt_cloud.py b/tests/providers/dbt/cloud/triggers/test_dbt_cloud.py
new file mode 100644
index 0000000000..1436112d22
--- /dev/null
+++ b/tests/providers/dbt/cloud/triggers/test_dbt_cloud.py
@@ -0,0 +1,304 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import sys
+import time
+
+import pytest
+
+from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunStatus
+from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
+from airflow.triggers.base import TriggerEvent
+
+if sys.version_info < (3, 8):
+ # For compatibility with Python 3.7
+ from asynctest import mock as async_mock
+ from asynctest.mock import CoroutineMock as AsyncMock
+else:
+ from unittest import mock as async_mock
+ from unittest.mock import AsyncMock
+
+
+class TestDbtCloudRunJobTrigger:
+ DAG_ID = "dbt_cloud_run"
+ TASK_ID = "dbt_cloud_run_task_op"
+ RUN_ID = 1234
+ CONN_ID = "dbt_cloud_default"
+ ACCOUNT_ID = 12340
+ END_TIME = time.time() + 60 * 60 * 24 * 7
+ POLL_INTERVAL = 3.0
+
+ def test_serialization(self):
+ """Assert DbtCloudRunJobTrigger correctly serializes its arguments and classpath."""
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ classpath, kwargs = trigger.serialize()
+ assert classpath == "airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger"
+ assert kwargs == {
+ "run_id": self.RUN_ID,
+ "account_id": self.ACCOUNT_ID,
+ "conn_id": self.CONN_ID,
+ "end_time": self.END_TIME,
+ "poll_interval": self.POLL_INTERVAL,
+ }
+
+ @pytest.mark.asyncio
+ @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+ async def test_dbt_run_job_trigger(self, mocked_is_still_running):
+ """Test DbtCloudRunJobTrigger is triggered with mocked details and run successfully."""
+ mocked_is_still_running.return_value = True
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ task = asyncio.create_task(trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+
+ # TriggerEvent was not returned
+ assert task.done() is False
+ asyncio.get_event_loop().stop()
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "mock_value, mock_status, mock_message",
+ [
+ (DbtCloudJobRunStatus.SUCCESS.value, "success", "Job run 1234 has completed successfully."),
+ ],
+ )
+ @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+ @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_job_run_for_terminal_status_success(
+ self, mock_get_job_status, mocked_is_still_running, mock_value, mock_status, mock_message
+ ):
+ """Assert that run trigger success message in case of job success"""
+ mocked_is_still_running.return_value = False
+ mock_get_job_status.return_value = mock_value
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ expected_result = {
+ "status": mock_status,
+ "message": mock_message,
+ "run_id": self.RUN_ID,
+ }
+ task = asyncio.create_task(trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+ assert TriggerEvent(expected_result) == task.result()
+ asyncio.get_event_loop().stop()
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "mock_value, mock_status, mock_message",
+ [
+ (DbtCloudJobRunStatus.CANCELLED.value, "cancelled", "Job run 1234 has been cancelled."),
+ ],
+ )
+ @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+ @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_job_run_for_terminal_status_cancelled(
+ self, mock_get_job_status, mocked_is_still_running, mock_value, mock_status, mock_message
+ ):
+ """Assert that run trigger success message in case of job success"""
+ mocked_is_still_running.return_value = False
+ mock_get_job_status.return_value = mock_value
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ expected_result = {
+ "status": mock_status,
+ "message": mock_message,
+ "run_id": self.RUN_ID,
+ }
+ task = asyncio.create_task(trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+ assert TriggerEvent(expected_result) == task.result()
+ asyncio.get_event_loop().stop()
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "mock_value, mock_status, mock_message",
+ [
+ (DbtCloudJobRunStatus.ERROR.value, "error", "Job run 1234 has failed."),
+ ],
+ )
+ @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+ @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_job_run_for_terminal_status_error(
+ self, mock_get_job_status, mocked_is_still_running, mock_value, mock_status, mock_message
+ ):
+ """Assert that run trigger success message in case of job success"""
+ mocked_is_still_running.return_value = False
+ mock_get_job_status.return_value = mock_value
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ expected_result = {
+ "status": mock_status,
+ "message": mock_message,
+ "run_id": self.RUN_ID,
+ }
+ task = asyncio.create_task(trigger.run().__anext__())
+ await asyncio.sleep(0.5)
+ assert TriggerEvent(expected_result) == task.result()
+ asyncio.get_event_loop().stop()
+
+ @pytest.mark.asyncio
+ @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+ @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_job_run_exception(self, mock_get_job_status, mocked_is_still_running):
+ """Assert that run catch exception if dbt cloud job API throw exception"""
+ mocked_is_still_running.return_value = False
+ mock_get_job_status.side_effect = Exception("Test exception")
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ task = [i async for i in trigger.run()]
+ response = TriggerEvent(
+ {
+ "status": "error",
+ "message": "Test exception",
+ "run_id": self.RUN_ID,
+ }
+ )
+ assert len(task) == 1
+ assert response in task
+
+ @pytest.mark.asyncio
+ @async_mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
+ @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_job_run_timeout(self, mock_get_job_status, mocked_is_still_running):
+ """Assert that run timeout after end_time elapsed"""
+ mocked_is_still_running.return_value = True
+ mock_get_job_status.side_effect = Exception("Test exception")
+ end_time = time.time()
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=end_time,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ generator = trigger.run()
+ actual = await generator.asend(None)
+ expected = TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Job run {self.RUN_ID} has not reached a terminal status "
+ f"after {end_time} seconds.",
+ "run_id": self.RUN_ID,
+ }
+ )
+ assert expected == actual
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "mock_response, expected_status",
+ [
+ (DbtCloudJobRunStatus.SUCCESS.value, False),
+ ],
+ )
+ @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_job_run_is_still_running_success(
+ self, mock_get_job_status, mock_response, expected_status
+ ):
+ """Test is_still_running with mocked response job status and assert
+ the return response with expected value"""
+ hook = AsyncMock(DbtCloudHook)
+ hook.get_job_status.return_value = mock_response
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ response = await trigger.is_still_running(hook)
+ assert response == expected_status
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "mock_response, expected_status",
+ [
+ (DbtCloudJobRunStatus.RUNNING.value, True),
+ ],
+ )
+ @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_job_run_is_still_running(self, mock_get_job_status, mock_response, expected_status):
+ """Test is_still_running with mocked response job status and assert
+ the return response with expected value"""
+ hook = AsyncMock(DbtCloudHook)
+ hook.get_job_status.return_value = mock_response
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ response = await trigger.is_still_running(hook)
+ assert response == expected_status
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "mock_response, expected_status",
+ [
+ (DbtCloudJobRunStatus.QUEUED.value, True),
+ ],
+ )
+ @async_mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_job_run_is_still_running_queued(
+ self, mock_get_job_status, mock_response, expected_status
+ ):
+ """Test is_still_running with mocked response job status and assert
+ the return response with expected value"""
+ hook = AsyncMock(DbtCloudHook)
+ hook.get_job_status.return_value = mock_response
+ trigger = DbtCloudRunJobTrigger(
+ conn_id=self.CONN_ID,
+ poll_interval=self.POLL_INTERVAL,
+ end_time=self.END_TIME,
+ run_id=self.RUN_ID,
+ account_id=self.ACCOUNT_ID,
+ )
+ response = await trigger.is_still_running(hook)
+ assert response == expected_status
diff --git a/tests/system/providers/dbt/cloud/example_dbt_cloud.py b/tests/system/providers/dbt/cloud/example_dbt_cloud.py
index feb8b15678..b65104aef7 100644
--- a/tests/system/providers/dbt/cloud/example_dbt_cloud.py
+++ b/tests/system/providers/dbt/cloud/example_dbt_cloud.py
@@ -30,7 +30,7 @@ from airflow.providers.dbt.cloud.operators.dbt import (
DbtCloudListJobsOperator,
DbtCloudRunJobOperator,
)
-from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor
+from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunAsyncSensor, DbtCloudJobRunSensor
from airflow.utils.edgemodifier import Label
from tests.system.utils import get_test_env_id
@@ -77,6 +77,12 @@ with DAG(
)
# [END howto_operator_dbt_cloud_run_job_sensor]
+ # [START howto_operator_dbt_cloud_run_job_async_sensor]
+ job_run_async_sensor = DbtCloudJobRunAsyncSensor(
+ task_id="job_run_async_sensor", run_id=trigger_job_run2.output, timeout=20
+ )
+ # [END howto_operator_dbt_cloud_run_job_async_sensor]
+
# [START howto_operator_dbt_cloud_list_jobs]
list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)
# [END howto_operator_dbt_cloud_list_jobs]
@@ -95,7 +101,6 @@ with DAG(
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()
-
from tests.system.utils import get_test_run # noqa: E402
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)