You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/21 19:53:54 UTC

[GitHub] [airflow] eskarimov opened a new pull request #19736: Add Databricks Deferrable Operators

eskarimov opened a new pull request #19736:
URL: https://github.com/apache/airflow/pull/19736


   closes: #18999 
   
   The PR intends to add deferrable versions of `DatabricksSubmitRunOperator` and `DatabricksRunNowOperator`, which are using [new Airflow functionality introduced with the version 2.2.0](https://airflow.apache.org/docs/apache-airflow/2.2.0/concepts/deferring.html#deferrable-operators-triggers).
   
   There're not so many examples of deferrable operators at the moment, so I'd appreciate if we discuss the following:
   - Should we update existing operators or create new?
     If we update existing operators right away, we'll break backward compatibility for Airflow versions prior 2.2.0, so I thought it'd be better to introduce new operators at the moment.
   - Execution timeout for deferrable operators not handled correctly at the moment, will investigate separately under #19382 
   - If password authentication is used, the pass is shown in logs of the Trigger process. Will need to check. When using token-based auth it's masked correctly. 
   - You can spot that the regular (non-deferrable) operators use shared function `_handle_databricks_operator_execution` for handling the logic of interacting with Databricks API. However, for the newly created operators I'd prefer to sacrifice DRY a bit and keep the logic described completely in `execute` method, for better readability and flexibility.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758707622



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)

Review comment:
       The problem with turning async safety onto everything by default is that you need something like the decorator to be in the middle of the call to do the check - not sure we can reasonably override every Airflow API function without some seriously dark magic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] chinwobble commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
chinwobble commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r776524338



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()

Review comment:
       @andrewgodwin Another issue I encountered with deferrable operators is that task duration during deferral is not counted.
   When I run this operator, it says all my DAG's task finish in 1 sec.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758656398



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)

Review comment:
       Hmm. Maybe there is a way to at least make "airflows" blocking methods somewhat flagged when this happens ? Or find a way to forbid them when we are in async context @andrewgodwin ?
   
   If you recall, this is the very case I was afraid it's too easy to "slip through" and when people start making use more of the deffered operators and use it in their custom ones which do not pass through watchful eyes of yours and other committers of Airflow. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1015229080


   Sorry for bothering, just wanted to check if I could do anything here to improve the PR?
   
   Trying to summarise the points discussed:
   - Blocking call to fetch secrets/connection details with `self.get_connection()` - the current implementation wraps `databricks_conn()` into `cached_property` decorator, called during `DatabricksHook` initialization, not sure if we should change anything here now.
   - Provide a warning if deferrable operator being used in Airflow prior to 2.2. An alternative option is to mark the provider as Airflow 2.2+ compatible, any input here is very welcome.
   
   I've also changed the initial approach for creating async hook methods - firstly I thought about creating class `DatabricksHookAsync` as a sub-class of `DatabricksHook`, to avoid duplicating code and having a clear separation between sync and async hook, but then I thought it'd be wrong to make it a sub-class, as we can't just replace usages of `DatabricksHook` with `DatabricksHookAsync`.
   Creating and supporting an independent `DatabricksHookAsync` class seems non-optimal as well. 
   So I ended up with creating async versions of regular functions inside `DatabricksHook`. Please let me know if there's any better way to do that.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758722206



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)

Review comment:
       Yeah. Basically you'd have to intercept every method call - while possible, it woudl be terrible performance penalty




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758686143



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)

Review comment:
       Or maybe we could revert the logic :) and assume by default, any airflow call is unsafe (unless decorated with @async_safe). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1082643980


   Resolved conflicts, rebased PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jplauri commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
jplauri commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754526756



##########
File path: airflow/providers/databricks/operators/databricks.py
##########
@@ -343,6 +315,45 @@ def on_kill(self):
         self.log.info('Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id)
 
 
+class DatabricksSubmitRunDeferrableOperator(DatabricksSubmitRunOperator):
+    """Deferrable version of ``DatabricksSubmitRunOperator``"""
+
+    def execute(self, context: Optional[dict]):
+        hook = self._get_hook()
+
+        self.run_id = hook.submit_run(self.json)
+        if self.do_xcom_push:
+            context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=self.run_id)
+        self.log.info(f'Run submitted with run_id: {self.run_id}')
+
+        run_page_url = hook.get_run_page_url(self.run_id)
+        if self.do_xcom_push:
+            context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url)
+        self.log.info(f'View run status, Spark UI, and logs at {run_page_url}')
+
+        self.defer(
+            trigger=DatabricksExecutionTrigger(
+                run_id=self.run_id,
+                databricks_conn_id=self.databricks_conn_id,
+                polling_period_seconds=self.polling_period_seconds,
+            ),
+            method_name=DEFER_METHOD_NAME,
+        )
+
+    def execute_complete(self, context: Optional[dict], event: dict):
+        validate_trigger_event(event)
+        run_state = RunState.from_json(event['run_state'])
+        run_page_url = event['run_page_url']
+        self.log.info('View run status, Spark UI, and logs at %s', run_page_url)

Review comment:
       For consistency, should this also be an f-string?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r759631853



##########
File path: airflow/providers/databricks/operators/databricks.py
##########
@@ -343,6 +315,45 @@ def on_kill(self):
         self.log.info('Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id)
 
 
+class DatabricksSubmitRunDeferrableOperator(DatabricksSubmitRunOperator):
+    """Deferrable version of ``DatabricksSubmitRunOperator``"""
+
+    def execute(self, context: Optional[dict]):
+        hook = self._get_hook()
+
+        self.run_id = hook.submit_run(self.json)
+        if self.do_xcom_push:
+            context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=self.run_id)
+        self.log.info(f'Run submitted with run_id: {self.run_id}')
+
+        run_page_url = hook.get_run_page_url(self.run_id)
+        if self.do_xcom_push:
+            context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url)
+        self.log.info(f'View run status, Spark UI, and logs at {run_page_url}')
+
+        self.defer(
+            trigger=DatabricksExecutionTrigger(
+                run_id=self.run_id,
+                databricks_conn_id=self.databricks_conn_id,
+                polling_period_seconds=self.polling_period_seconds,
+            ),
+            method_name=DEFER_METHOD_NAME,
+        )
+
+    def execute_complete(self, context: Optional[dict], event: dict):
+        validate_trigger_event(event)
+        run_state = RunState.from_json(event['run_state'])
+        run_page_url = event['run_page_url']
+        self.log.info('View run status, Spark UI, and logs at %s', run_page_url)
+
+        if run_state.is_successful:
+            self.log.info('Job run completed successfully.')
+            return
+        else:
+            error_message = f'Job run failed with terminal state: {run_state}'
+            raise AirflowException(error_message)
+
+
 class DatabricksRunNowOperator(BaseOperator):

Review comment:
       Ah yes, I can see how that might leave things in a strange state. We may need to add code to do better cleanup when a task is killed-while-deferred.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758656398



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)

Review comment:
       Hmm. Maybe there is a way to at least make "airflows" blocking methods somewhat flagged when this happens ? Or find a way to forbid them when we are in async context @andrewgodwin ?
   
   If you recall, this is the very case I was afraid it's too easy to "slip through"  when people start making more use 
    of the deffered operators and use it in their custom ones which do not pass through watchful eyes of yours and other committers of Airflow. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1002569512


   > Thank you for this point, true, the deferrable operators functionality is only available starting 2.2 version.
   
   Thank our CI :) . It simply caught this ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754981584



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()

Review comment:
       > > you must move the abstraction to the trigger instead.
   > 
   > My understanding is that when there're multiple `triggerer` processes, each trigger will be executed as a separate async task with its own `ClientSession` instance. So even if we move it to `DatabricksExecutionTrigger`, it'd still create a session for each trigger run.
   
   Sorry, I think I meant to say trigger**er** instead of trigger 🙂 
   
   Your general summary is correct though; I don’t think it’s particularly worthwhile to make that optimisation right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758578414



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)

Review comment:
       Watch out - this is a blocking call, as it does a database fetch/secrets backend connection behind the scenes. You need to wrap it in something to make it async-compatible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1068516938


   The PR is rebased, together with resolving conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-993422955


   Updated and rebased the PR, implementing new auth methods for Azure AD, and making improvements based on the comments in the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758707622



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)

Review comment:
       The problem with assuming by default is that you need something like the decorator to be in the middle of the call to do the check - not sure we can reasonably override every Airflow API function without some seriously dark magic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r753892600



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()

Review comment:
       Note that hooks are not necessarily run in the same process, so if you want to share sessions among them, you must move the abstraction to the trigger instead.
   
   I wonder how viable it’d be to refactor the sychronous version (`DatabricksHook`) into a [sans I/O](https://sans-io.readthedocs.io/) base class that can be used by `DatabricksExecutionTrigger`, instead of implementing a new, entirely separate hook for it.

##########
File path: setup.py
##########
@@ -244,6 +244,8 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
 ]
 databricks = [
     'requests>=2.26.0, <3',
+    'aiohttp>=3.6.3, <4',
+    'asynctest~=0.13',

Review comment:
       This should not be a part of the provider extra, but in the `tests` extra instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jplauri commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
jplauri commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754525579



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)
+
+        auth = None
+        headers = {}
+        if 'token' in self.databricks_conn.extra_dejson:
+            self.log.info('Using token auth. ')
+            headers["Authorization"] = f'Bearer {self.databricks_conn.extra_dejson["token"]}'
+            if 'host' in self.databricks_conn.extra_dejson:
+                host = self._parse_host(self.databricks_conn.extra_dejson['host'])
+            else:
+                host = self.databricks_conn.host
+        else:
+            self.log.info('Using basic auth. ')
+            auth = aiohttp.BasicAuth(self.databricks_conn.login, self.databricks_conn.password)
+            host = self.databricks_conn.host

Review comment:
       Completely superficial, but is the whitespace after "auth." intended?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alexott commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
alexott commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754266283



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -351,31 +362,31 @@ def _do_api_call(self, endpoint_info, json):
     def _log_request_error(self, attempt_num: int, error: str) -> None:
         self.log.error('Attempt %s API Request to Databricks failed with reason: %s', attempt_num, error)
 
-    def run_now(self, json: dict) -> str:
+    def run_now(self, json: dict) -> int:
         """
         Utility function to call the ``api/2.0/jobs/run-now`` endpoint.
 
         :param json: The data used in the body of the request to the ``run-now`` endpoint.
         :type json: dict
         :return: the run_id as a string
-        :rtype: str
+        :rtype: int
         """
         response = self._do_api_call(RUN_NOW_ENDPOINT, json)
         return response['run_id']
 
-    def submit_run(self, json: dict) -> str:
+    def submit_run(self, json: dict) -> int:

Review comment:
       we need to be careful with types, as there will be a change soon switching to 64-bit IDs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r759627482



##########
File path: airflow/providers/databricks/operators/databricks.py
##########
@@ -343,6 +315,45 @@ def on_kill(self):
         self.log.info('Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id)
 
 
+class DatabricksSubmitRunDeferrableOperator(DatabricksSubmitRunOperator):
+    """Deferrable version of ``DatabricksSubmitRunOperator``"""
+
+    def execute(self, context: Optional[dict]):
+        hook = self._get_hook()
+
+        self.run_id = hook.submit_run(self.json)
+        if self.do_xcom_push:
+            context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=self.run_id)
+        self.log.info(f'Run submitted with run_id: {self.run_id}')
+
+        run_page_url = hook.get_run_page_url(self.run_id)
+        if self.do_xcom_push:
+            context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url)
+        self.log.info(f'View run status, Spark UI, and logs at {run_page_url}')
+
+        self.defer(
+            trigger=DatabricksExecutionTrigger(
+                run_id=self.run_id,
+                databricks_conn_id=self.databricks_conn_id,
+                polling_period_seconds=self.polling_period_seconds,
+            ),
+            method_name=DEFER_METHOD_NAME,
+        )
+
+    def execute_complete(self, context: Optional[dict], event: dict):
+        validate_trigger_event(event)
+        run_state = RunState.from_json(event['run_state'])
+        run_page_url = event['run_page_url']
+        self.log.info('View run status, Spark UI, and logs at %s', run_page_url)
+
+        if run_state.is_successful:
+            self.log.info('Job run completed successfully.')
+            return
+        else:
+            error_message = f'Job run failed with terminal state: {run_state}'
+            raise AirflowException(error_message)
+
+
 class DatabricksRunNowOperator(BaseOperator):

Review comment:
       I've just tried to play with the deferrable operator's behaviour when a task is killed, and it might be there's a bug with it.
   If a task was killed while being executed by `Triggerer`, there's no log available for this task after kill. Later on, if the same task is started again, it finishes immediately, like it was continued after being deferred. Will raise an issue with detailed description how to reproduce it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1044158093


   Resolved the conflicts and rebased the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r753939551



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()

Review comment:
       They could, yes, because async triggers are designed to run in the same process, that’s way they are async. If they run in their own process, there’s no point to use asyncio in the first place. But the difficult is who should manage that shared session. I wonder if @andrewgodwin actually though about this preemptively 😛 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754981584



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()

Review comment:
       > > you must move the abstraction to the trigger instead.
   > My understanding is that when there're multiple `triggerer` processes, each trigger will be executed as a separate async task with its own `ClientSession` instance. So even if we move it to `DatabricksExecutionTrigger`, it'd still create a session for each trigger run.
   
   Sorry, I think I meant to say trigger**er** instead of trigger 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1003426676


   > `DatabricksExecutionTrigger` [is inherited from `BaseTrigger`](https://github.com/apache/airflow/blob/7893d94e1502fd1f46e7732f08c04fb1c194a06d/airflow/providers/databricks/triggers/databricks.py#L26). So if we just log a warning on importing Airflow 2.2+ classes, the CI check will still fail, because `BaseTrigger` and `TriggerEvent` won't be defined.
   
   Not really. We can - in this case - set BaseTrigger, TriggerEvent to None add the warning to "known warnings". As long as those classes are not actually 'run' during parsing the class (they shoud not be). It will work just fine.
   
   We've done that  in other places:
   
   ```
   try:
       from kubernetes.client import models as k8s
   except ImportError:
       log.warning(
           "The example_kubernetes_executor example DAG requires the kubernetes provider."
           " Please install it with: pip install apache-airflow[cncf.kubernetes]"
       )
       k8s = None
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758656398



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)

Review comment:
       Hmm. Maybe there is a way to at least make "airflows" blocking methods somewhat flagged when this happens ? Or find a way to forbid them when we are in async context @andrewgodwin ?
   
   If you recall, this is the very case I was afraid it's too easy to "slip through" and when people start making more use 
    of the deffered operators and use it in their custom ones which do not pass through watchful eyes of yours and other committers of Airflow. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758576866



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()

Review comment:
       Yes, there is no easy mechanism for pooling connections that are all running under a single triggerer right now, but you can also auto-pool from an async hook implementation by detecting same-thread different-coroutine if you need to. Just makes it quite a bit more complex.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] chinwobble commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
chinwobble commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r753877719



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()

Review comment:
       This is really good stuff.
   
   Currently you are using one Hook per trigger. That means if you had 32 concurrent triggers, then they would all have their own client sessions.
   I've had a look at the docs for aiohttp, it says:
   ```
   it is suggested you use a single session for the lifetime of your application to benefit from connection pooling.
   ```
   https://docs.aiohttp.org/en/stable/client_reference.html
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] chinwobble commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
chinwobble commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r753936415



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()

Review comment:
       Ideally each `Triggerer` process would share one session. A DAG can trigger multiple `DatabricksExecutionTrigger` at the same time. Could these triggers all share the same session?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754526457



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -351,31 +362,31 @@ def _do_api_call(self, endpoint_info, json):
     def _log_request_error(self, attempt_num: int, error: str) -> None:
         self.log.error('Attempt %s API Request to Databricks failed with reason: %s', attempt_num, error)
 
-    def run_now(self, json: dict) -> str:
+    def run_now(self, json: dict) -> int:
         """
         Utility function to call the ``api/2.0/jobs/run-now`` endpoint.
 
         :param json: The data used in the body of the request to the ``run-now`` endpoint.
         :type json: dict
         :return: the run_id as a string
-        :rtype: str
+        :rtype: int
         """
         response = self._do_api_call(RUN_NOW_ENDPOINT, json)
         return response['run_id']
 
-    def submit_run(self, json: dict) -> str:
+    def submit_run(self, json: dict) -> int:

Review comment:
       I was just referring to the Databricks API docs, where it's int.
   I think should be handled correctly, at least I've checked to align types everywhere.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alexott commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
alexott commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754267058



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)
+
+        auth = None
+        headers = {}
+        if 'token' in self.databricks_conn.extra_dejson:
+            self.log.info('Using token auth. ')
+            headers["Authorization"] = f'Bearer {self.databricks_conn.extra_dejson["token"]}'
+            if 'host' in self.databricks_conn.extra_dejson:
+                host = self._parse_host(self.databricks_conn.extra_dejson['host'])
+            else:
+                host = self.databricks_conn.host
+        else:
+            self.log.info('Using basic auth. ')
+            auth = aiohttp.BasicAuth(self.databricks_conn.login, self.databricks_conn.password)
+            host = self.databricks_conn.host

Review comment:
       This needs to be updated to support additional authentication methods introduced recently




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] chinwobble commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
chinwobble commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r755019613



##########
File path: airflow/providers/databricks/operators/databricks.py
##########
@@ -343,6 +315,45 @@ def on_kill(self):
         self.log.info('Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id)
 
 
+class DatabricksSubmitRunDeferrableOperator(DatabricksSubmitRunOperator):
+    """Deferrable version of ``DatabricksSubmitRunOperator``"""
+
+    def execute(self, context: Optional[dict]):
+        hook = self._get_hook()
+
+        self.run_id = hook.submit_run(self.json)
+        if self.do_xcom_push:
+            context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=self.run_id)
+        self.log.info(f'Run submitted with run_id: {self.run_id}')
+
+        run_page_url = hook.get_run_page_url(self.run_id)
+        if self.do_xcom_push:
+            context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url)
+        self.log.info(f'View run status, Spark UI, and logs at {run_page_url}')
+
+        self.defer(
+            trigger=DatabricksExecutionTrigger(
+                run_id=self.run_id,
+                databricks_conn_id=self.databricks_conn_id,
+                polling_period_seconds=self.polling_period_seconds,
+            ),
+            method_name=DEFER_METHOD_NAME,
+        )
+
+    def execute_complete(self, context: Optional[dict], event: dict):
+        validate_trigger_event(event)
+        run_state = RunState.from_json(event['run_state'])
+        run_page_url = event['run_page_url']
+        self.log.info('View run status, Spark UI, and logs at %s', run_page_url)
+
+        if run_state.is_successful:
+            self.log.info('Job run completed successfully.')
+            return
+        else:
+            error_message = f'Job run failed with terminal state: {run_state}'
+            raise AirflowException(error_message)
+
+
 class DatabricksRunNowOperator(BaseOperator):

Review comment:
       maybe slightly outside the scope of the particular pull request.
   It would be nice if implement on the `on_kill` method too.
   When a process is cancelled or marked as failed, it should also cancel the running databricks job run. 
   To do this, you will need to push the run_id to xcom and retrieve it in the on_kill method. I'm not sure if this is possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754529168



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)
+
+        auth = None
+        headers = {}
+        if 'token' in self.databricks_conn.extra_dejson:
+            self.log.info('Using token auth. ')
+            headers["Authorization"] = f'Bearer {self.databricks_conn.extra_dejson["token"]}'
+            if 'host' in self.databricks_conn.extra_dejson:
+                host = self._parse_host(self.databricks_conn.extra_dejson['host'])
+            else:
+                host = self.databricks_conn.host
+        else:
+            self.log.info('Using basic auth. ')
+            auth = aiohttp.BasicAuth(self.databricks_conn.login, self.databricks_conn.password)
+            host = self.databricks_conn.host

Review comment:
       Thanks for pointing! I think to refactor `DatabricksHook`, sharing functionality between sync and async as much as possible, to avoid maintaining 2 similar classes in the future




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] chinwobble commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
chinwobble commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r753877719



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()

Review comment:
       This is really good stuff.
   
   Currently you are using one Hook per trigger. That means if you had 32 concurrent triggers, then they would all have their own client sessions.
   I've had a look at the docs for aiohttp, it says:
   
   > it is suggested you use a single session for the lifetime of your application to benefit from connection pooling.
   
   https://docs.aiohttp.org/en/stable/client_reference.html
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-974884364


   @chinwobble, kindly requesting to review :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] andrewgodwin commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
andrewgodwin commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758683470



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)

Review comment:
       Indeed. We could always lift what we did with Django for this very problem: https://github.com/django/django/blob/37d9ea5d5c010d54a416417399344c39f4e9f93e/django/utils/asyncio.py#L8




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1066204353


   Some changes are merged so the conflicts need to be resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1003305921


   Thinking loud about the second approach with trying and catching exceptions:
   when we catch `ImportError` on importing non-existing classes `airflow.triggers.base.BaseTrigger` and `airflow.triggers.base.TriggerEvent` how it'd be better to handle the exception?
   
   `DatabricksExecutionTrigger` [is inherited from `BaseTrigger`](https://github.com/apache/airflow/blob/7893d94e1502fd1f46e7732f08c04fb1c194a06d/airflow/providers/databricks/triggers/databricks.py#L26). So if we just log a warning on importing Airflow 2.2+ classes, the CI check will still fail, because `BaseTrigger` and `TriggerEvent` won't be defined.
   
   I was thinking about using the abstract class as a substitute of non-existing classes if `ImportError` was raised, but not sure, if it'd be the right approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754525681



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()

Review comment:
       Thank you both so much, that's very good points.
   
   Trying to summarise:
   - Ideally all tasks executed by each `trigerrer` should share the same session - however, not sure if it'd be possible without touching the core Airflow `Triggerer` functionality.
   - >Note that hooks are not necessarily run in the same process, so if you want to share sessions among them, you must move the abstraction to the trigger instead.
     
     Does it mean to move `ClientSession` initialisation to the trigger, i.e. as a property of `DatabricksExecutionTrigger`?
     My understanding is that when there're multiple `triggerer` processes, each trigger will be executed as a separate async task with its own `ClientSession` instance. So even if we move it to `DatabricksExecutionTrigger`, it'd still create a session for each trigger run.
   - Refactoring `Databricks` hook - I agree it'd be the perfect solution to keep everything inside a single class. Also, after [the comment](https://github.com/apache/airflow/pull/19736#discussion_r754267058) by @alexott it's even more motivation to do that to prevent making double work in the future. I'll try to refactor it, to minimize repeatable code and isolating IO operations




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jplauri commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
jplauri commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754530593



##########
File path: airflow/providers/databricks/utils/databricks.py
##########
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from typing import Union
+
+from airflow.exceptions import AirflowException
+from airflow.providers.databricks.hooks.databricks import RunState
+
+
+def deep_string_coerce(content, json_path: str = 'json') -> Union[str, list, dict]:
+    """
+    Coerces content or all values of content if it is a dict to a string. The
+    function will throw if content contains non-string or non-numeric types.
+
+    The reason why we have this function is because the ``self.json`` field must be a
+    dict with only string values. This is because ``render_template`` will fail
+    for numerical values.
+    """
+    coerce = deep_string_coerce
+    if isinstance(content, str):
+        return content
+    elif isinstance(
+        content,
+        (
+            int,
+            float,
+        ),
+    ):
+        # Databricks can tolerate either numeric or string types in the API backend.
+        return str(content)
+    elif isinstance(content, (list, tuple)):
+        return [coerce(e, f'{json_path}[{i}]') for i, e in enumerate(content)]
+    elif isinstance(content, dict):
+        return {k: coerce(v, f'{json_path}[{k}]') for k, v in list(content.items())}
+    else:
+        param_type = type(content)
+        msg = f'Type {param_type} used for parameter {json_path} is not a number or a string'
+        raise AirflowException(msg)
+
+
+def validate_trigger_event(event: dict):
+    """Validates correctness of the event received
+    from :class:`~airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger
+    """
+    keys_to_check = ['run_id', 'run_page_url', 'run_state']
+    for key in keys_to_check:
+        if not event.get(key):

Review comment:
       I think it's a tiny bit simpler to do `if key not in event` than to use get.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1052611814


   Tests are still failing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1061157161


   Resolved conflicts and adjusted to the new changes introduced recently in Databricks provider package.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1084658964


   I do not know that much about deferrable operators yet to commetn, but at least from the quick look it looks good @andrewgodwin ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1002513558


   > This operator will not be clearly importing on Airflow 2.1 (see the errors in tests). Airlfow.trigger is not available in Airlfow 2.1.
   > 
   > There are two approaches we can take:
   > 
   > 1. make the Databricks provider  Airlfow 2.2+
   > 2. Try/catch import errors and provide a warning that the Deferrable Operators will only work for Airlfow 2.2+
   > 
   > I am for the 2nd solution as there are quite a few non-defferable operators there that shoudl continue to work. But I would love to hear what others think.
   
   Thank you for this point, true, the deferrable operators functionality is only available starting 2.2 version.
   
   The second approach with try/catch import errors sounds right to me.
   
   I'd be glad to work on it, if others agree upon this approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1002172319


   This operator will not be clearly importing on Airflow 2.1 (see the errors in tests). Airlfow.trigger is not available in Airlfow 2.1.
   
   There are two approaches we can take:
   
   1) make the Databricks provider  Airlfow 2.2+ 
   2) Try/catch import errors and provide a warning that the Deferrable Operators will only work for Airlfow 2.2+
   
   I am for the 2nd solution as there are quite a few non-defferable operators there that shoudl continue to work. But I would love to hear what others think.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r758685358



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)

Review comment:
       :heart: it. I think there are a number of "likely to use" methods in Airflow that we could mark as `async_unsafe` with decorators, anticipating they will cause troubles.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eskarimov commented on pull request #19736: Add Databricks Deferrable Operators

Posted by GitBox <gi...@apache.org>.
eskarimov commented on pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#issuecomment-1004653152


   I've rebased the PR and resolved the conflicts, adapting operators/hooks to the recent changes and added a warning in case of using Deferrable Operators on Airflow prior to 2.2 version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org