You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "josh-fell (via GitHub)" <gi...@apache.org> on 2023/02/22 17:35:19 UTC

[GitHub] [airflow] josh-fell commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

josh-fell commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1114587026


##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,

Review Comment:
   Would mind adding this param to the docstirng as well?



##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::

Review Comment:
   ```suggestion
           For more information on the DbtCloudJobRunAsyncSensor, take a look at the guide:
   ```



##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,
+        timeout: float = 60 * 60 * 24 * 7,
+        **kwargs: Any,
+    ):
+        self.poll_interval = poll_interval
+        self.timeout = timeout
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """Defers to Trigger class to poll for state of the job run until
+        it reaches a failure state or success state"""

Review Comment:
   ```suggestion
           """
           Defers to Trigger class to poll for state of the job run until
           it reaches a failure state or success state.
           """
   ```



##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,
+        timeout: float = 60 * 60 * 24 * 7,
+        **kwargs: Any,
+    ):
+        self.poll_interval = poll_interval
+        self.timeout = timeout
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """Defers to Trigger class to poll for state of the job run until
+        it reaches a failure state or success state"""
+        end_time = time.time() + self.timeout
+        self.defer(
+            timeout=self.execution_timeout,
+            trigger=DbtCloudRunJobTrigger(
+                run_id=self.run_id,
+                conn_id=self.dbt_cloud_conn_id,
+                account_id=self.account_id,
+                poll_interval=self.poll_interval,
+                end_time=end_time,
+            ),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: Dict[str, Any]) -> int:

Review Comment:
   ```suggestion
       def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
   ```
   PEP 563 should handle this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org