You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "dimberman (via GitHub)" <gi...@apache.org> on 2023/02/23 00:24:12 UTC

[GitHub] [airflow] dimberman commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

dimberman commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1115132610


##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,
+        timeout: float = 60 * 60 * 24 * 7,
+        **kwargs: Any,
+    ):
+        self.poll_interval = poll_interval
+        self.timeout = timeout
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """Defers to Trigger class to poll for state of the job run until
+        it reaches a failure state or success state"""
+        end_time = time.time() + self.timeout
+        self.defer(
+            timeout=self.execution_timeout,
+            trigger=DbtCloudRunJobTrigger(
+                run_id=self.run_id,
+                conn_id=self.dbt_cloud_conn_id,
+                account_id=self.account_id,
+                poll_interval=self.poll_interval,
+                end_time=end_time,
+            ),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: Dict[str, Any]) -> int:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes execution was
+        successful.
+        """
+        if event["status"] in ["error", "cancelled"]:
+            raise AirflowException(event["message"])

Review Comment:
   Should we wrap this error? Something like "Error in dbt: {event[message]}"
   
   That way users know that the error is not based in Airflow and is instead in dbt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org