You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/22 18:13:49 UTC

[GitHub] [airflow] jplauri commented on a change in pull request #19736: Add Databricks Deferrable Operators

jplauri commented on a change in pull request #19736:
URL: https://github.com/apache/airflow/pull/19736#discussion_r754525579



##########
File path: airflow/providers/databricks/hooks/databricks.py
##########
@@ -493,3 +504,120 @@ def __init__(self, token: str) -> None:
     def __call__(self, r: PreparedRequest) -> PreparedRequest:
         r.headers['Authorization'] = 'Bearer ' + self.token
         return r
+
+
+class DatabricksAsyncHook(DatabricksHook):
+    """
+    Async version of the ``DatabricksHook``
+    Implements only necessary methods used further in Databricks Triggers.
+    """
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        super().__init__(*args, **kwargs)
+
+    async def __aenter__(self):
+        self._session = aiohttp.ClientSession()
+        return self
+
+    async def __aexit__(self, *err):
+        await self._session.close()
+        self._session = None
+
+    async def _do_api_call(self, endpoint_info: Tuple[str, str], json: dict) -> dict:
+        """
+        Utility function to perform an async API call with retries
+
+        :param endpoint_info: Tuple of method and endpoint
+        :type endpoint_info: tuple[string, string]
+        :param json: Parameters for this API call.
+        :type json: dict
+        :return: If the api call returns a OK status code,
+            this function returns the response in JSON. Otherwise, throw an AirflowException.
+        :rtype: dict
+        """
+        method, endpoint = endpoint_info
+
+        self.databricks_conn = self.get_connection(self.databricks_conn_id)
+
+        auth = None
+        headers = {}
+        if 'token' in self.databricks_conn.extra_dejson:
+            self.log.info('Using token auth. ')
+            headers["Authorization"] = f'Bearer {self.databricks_conn.extra_dejson["token"]}'
+            if 'host' in self.databricks_conn.extra_dejson:
+                host = self._parse_host(self.databricks_conn.extra_dejson['host'])
+            else:
+                host = self.databricks_conn.host
+        else:
+            self.log.info('Using basic auth. ')
+            auth = aiohttp.BasicAuth(self.databricks_conn.login, self.databricks_conn.password)
+            host = self.databricks_conn.host

Review comment:
       Completely superficial, but is the whitespace after "auth." intended?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org