You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/28 08:13:27 UTC

[GitHub] [airflow] eskarimov commented on a change in pull request #19835: Refactor DatabricksHook

eskarimov commented on a change in pull request #19835:
URL: https://github.com/apache/airflow/pull/19835#discussion_r757859224



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -64,10 +66,12 @@
 class RunState:
     """Utility class for the run state concept of Databricks runs."""
 
-    def __init__(self, life_cycle_state: str, result_state: str, state_message: str) -> None:
+    def __init__(
+        self, life_cycle_state: str, state_message: str, result_state: str = None, *args, **kwargs

Review comment:
       It needs to be reviewed together with the changes for initialising the class instance out of API response:
   
   Current version:
   ```python
           state = response['state']
           life_cycle_state = state['life_cycle_state']
           # result_state may not be in the state if not terminal
           result_state = state.get('result_state', None)
           state_message = state['state_message']
           return RunState(life_cycle_state, result_state, state_message)
   ```
   Proposed version:
   ```python
           state = response['state']
           return RunState(**state)
   ```
   
   Current version is basically an intermediate layer between the API response and class, extracting values out of the API response and initialising class instance. But actually the response should already represent a state, why do we need this layer then?
   I see the following drawbacks with it:
   - Class description doesn't tell that `result_state` might be missing if state is not terminal. Currently it's described with the comment deep in the code.
   - It tends to increase repeating code - let's say we want to introduce async class for `DatabricksHook`. This logic needs to be written twice. Also in case we want to change the class in the future, let's say add new property `user_cancelled_or_timedout` (which is already a part of the API response), then we need to change class arguments, parsing response logic and class instance initialisation everywhere it's used.
     With the proposed version, we only need to change class arguments.
   
   With all the above, answering the questions:
   - > why do we need *args, **kwargs ?
         
        It shows that RunState might receive other init arguments (since we don't have control over API response), see above example with `user_cancelled_or_timedout` in the response.
   
   - > why to change order of the parameters? They are logical now: lifecycle -> result state -> state message.
   
        Just because of Python syntax, we need to put arguments with default values after required arguments.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -235,21 +241,18 @@ def _get_aad_token(self, resource: str) -> str:
             attempt_num += 1
             sleep(self.retry_delay)
 
-    def _fill_aad_tokens(self, headers: dict) -> str:
+    def _fill_aad_headers(self, headers: dict) -> dict:
         """
-        Fills headers if necessary (SPN is outside of the workspace) and generates AAD token
+        Fills AAD headers if necessary (SPN is outside of the workspace)
         :param headers: dictionary with headers to fill-in
-        :return: AAD token
+        :return: dictionary with filled AAD headers
         """
-        # SP is outside of the workspace
-        if 'azure_resource_id' in self.databricks_conn.extra_dejson:

Review comment:
       What do you think if we call it `_get_aad_headers()`, which would return either empty dict or a filled dict? Also we won't need input arg `headers` in this case.
   
   Then we could construct headers like:
   ```python
   aad_headers = self._get_aad_headers()
   headers = {**USER_AGENT_HEADER.copy(), **aad_headers}
   ```
   

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -356,31 +353,31 @@ def _do_api_call(self, endpoint_info, json):
     def _log_request_error(self, attempt_num: int, error: str) -> None:
         self.log.error('Attempt %s API Request to Databricks failed with reason: %s', attempt_num, error)
 
-    def run_now(self, json: dict) -> str:
+    def run_now(self, json: dict) -> int:

Review comment:
       It won't break existing code, actually it's backwards - if someone assumes that output is `str` because of the function signature, then it'd break the code, because the actual returned type is `int`.

##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -522,6 +515,20 @@ def uninstall(self, json: dict) -> None:
         """
         self._do_api_call(UNINSTALL_LIBS_ENDPOINT, json)
 
+    @staticmethod
+    def _is_aad_token_valid(aad_token: dict) -> bool:

Review comment:
       - Mainly for readability to hide the details for checking that token is valid under the function, because it's not the main purpose of the parent function `_get_aad_token`
   - There's a mistake in the current function implementation: it subtracts `TOKEN_REFRESH_LEAD_TIME` out of the current time, while it should actually sum it. With this we fix it and cover the function with tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org