You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "phanikumv (via GitHub)" <gi...@apache.org> on 2023/02/22 14:27:41 UTC

[GitHub] [airflow] phanikumv opened a new pull request, #29695: Add `DbtCloudJobRunAsyncSensor`

phanikumv opened a new pull request, #29695:
URL: https://github.com/apache/airflow/pull/29695

   This PR donates `DbtCloudJobRunAsyncSensor` from [astronomer-providers ](https://github.com/astronomer/astronomer-providers)repo to OSS Airflow
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

Posted by "josh-fell (via GitHub)" <gi...@apache.org>.
josh-fell commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1114587026


##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,

Review Comment:
   Would mind adding this param to the docstring as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

Posted by "josh-fell (via GitHub)" <gi...@apache.org>.
josh-fell commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1114587026


##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,

Review Comment:
   Would mind adding this param to the docstirng as well?



##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::

Review Comment:
   ```suggestion
           For more information on the DbtCloudJobRunAsyncSensor, take a look at the guide:
   ```



##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,
+        timeout: float = 60 * 60 * 24 * 7,
+        **kwargs: Any,
+    ):
+        self.poll_interval = poll_interval
+        self.timeout = timeout
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """Defers to Trigger class to poll for state of the job run until
+        it reaches a failure state or success state"""

Review Comment:
   ```suggestion
           """
           Defers to Trigger class to poll for state of the job run until
           it reaches a failure state or success state.
           """
   ```



##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,
+        timeout: float = 60 * 60 * 24 * 7,
+        **kwargs: Any,
+    ):
+        self.poll_interval = poll_interval
+        self.timeout = timeout
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """Defers to Trigger class to poll for state of the job run until
+        it reaches a failure state or success state"""
+        end_time = time.time() + self.timeout
+        self.defer(
+            timeout=self.execution_timeout,
+            trigger=DbtCloudRunJobTrigger(
+                run_id=self.run_id,
+                conn_id=self.dbt_cloud_conn_id,
+                account_id=self.account_id,
+                poll_interval=self.poll_interval,
+                end_time=end_time,
+            ),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: Dict[str, Any]) -> int:

Review Comment:
   ```suggestion
       def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
   ```
   PEP 563 should handle this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1115299464


##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1115297396


##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1115301175


##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,
+        timeout: float = 60 * 60 * 24 * 7,
+        **kwargs: Any,
+    ):
+        self.poll_interval = poll_interval
+        self.timeout = timeout
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """Defers to Trigger class to poll for state of the job run until
+        it reaches a failure state or success state"""
+        end_time = time.time() + self.timeout
+        self.defer(
+            timeout=self.execution_timeout,
+            trigger=DbtCloudRunJobTrigger(
+                run_id=self.run_id,
+                conn_id=self.dbt_cloud_conn_id,
+                account_id=self.account_id,
+                poll_interval=self.poll_interval,
+                end_time=end_time,
+            ),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: Dict[str, Any]) -> int:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes execution was
+        successful.
+        """
+        if event["status"] in ["error", "cancelled"]:
+            raise AirflowException(event["message"])

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on pull request #29695: Add `DbtCloudJobRunAsyncSensor`

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on PR #29695:
URL: https://github.com/apache/airflow/pull/29695#issuecomment-1441452142

   > It would be great to add unit tests for this as well.
   
   added unit tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dimberman commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

Posted by "dimberman (via GitHub)" <gi...@apache.org>.
dimberman commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1115132610


##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,
+        timeout: float = 60 * 60 * 24 * 7,
+        **kwargs: Any,
+    ):
+        self.poll_interval = poll_interval
+        self.timeout = timeout
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """Defers to Trigger class to poll for state of the job run until
+        it reaches a failure state or success state"""
+        end_time = time.time() + self.timeout
+        self.defer(
+            timeout=self.execution_timeout,
+            trigger=DbtCloudRunJobTrigger(
+                run_id=self.run_id,
+                conn_id=self.dbt_cloud_conn_id,
+                account_id=self.account_id,
+                poll_interval=self.poll_interval,
+                end_time=end_time,
+            ),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: Dict[str, Any]) -> int:
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes execution was
+        successful.
+        """
+        if event["status"] in ["error", "cancelled"]:
+            raise AirflowException(event["message"])

Review Comment:
   Should we wrap this error? Something like "Error in dbt: {event[message]}"
   
   That way users know that the error is not based in Airflow and is instead in dbt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] phanikumv commented on a diff in pull request #29695: Add `DbtCloudJobRunAsyncSensor`

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv commented on code in PR #29695:
URL: https://github.com/apache/airflow/pull/29695#discussion_r1115300047


##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,
+        timeout: float = 60 * 60 * 24 * 7,
+        **kwargs: Any,
+    ):
+        self.poll_interval = poll_interval
+        self.timeout = timeout
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """Defers to Trigger class to poll for state of the job run until
+        it reaches a failure state or success state"""

Review Comment:
   fixed



##########
airflow/providers/dbt/cloud/sensors/dbt.py:
##########
@@ -64,3 +72,56 @@ def poke(self, context: Context) -> bool:
             raise DbtCloudJobRunException(f"Job run {self.run_id} has been cancelled.")
 
         return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
+
+
+class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
+    """
+    Checks the status of a dbt Cloud job run asynchronously.
+
+    .. seealso::
+        For more information on sync Sensor DbtCloudJobRunAsyncSensor, take a look at the guide::
+        :ref:`howto/operator:DbtCloudJobRunAsyncSensor`
+
+    :param dbt_cloud_conn_id: The connection identifier for connecting to dbt Cloud.
+    :param run_id: The job run identifier.
+    :param account_id: The dbt Cloud account identifier.
+    :param timeout: Time in seconds to wait for a job run to reach a terminal status. Defaults to 7 days.
+    """
+
+    def __init__(
+        self,
+        *,
+        poll_interval: float = 5,
+        timeout: float = 60 * 60 * 24 * 7,
+        **kwargs: Any,
+    ):
+        self.poll_interval = poll_interval
+        self.timeout = timeout
+        super().__init__(**kwargs)
+
+    def execute(self, context: Context) -> None:
+        """Defers to Trigger class to poll for state of the job run until
+        it reaches a failure state or success state"""
+        end_time = time.time() + self.timeout
+        self.defer(
+            timeout=self.execution_timeout,
+            trigger=DbtCloudRunJobTrigger(
+                run_id=self.run_id,
+                conn_id=self.dbt_cloud_conn_id,
+                account_id=self.account_id,
+                poll_interval=self.poll_interval,
+                end_time=end_time,
+            ),
+            method_name="execute_complete",
+        )
+
+    def execute_complete(self, context: Context, event: Dict[str, Any]) -> int:

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil merged pull request #29695: Add `DbtCloudJobRunAsyncSensor`

Posted by "kaxil (via GitHub)" <gi...@apache.org>.
kaxil merged PR #29695:
URL: https://github.com/apache/airflow/pull/29695


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org