You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/10 18:31:41 UTC
[airflow] branch main updated: Databricks: Correctly handle HTTP exception (#22885)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aa8c08db38 Databricks: Correctly handle HTTP exception (#22885)
aa8c08db38 is described below
commit aa8c08db383ebfabf30a7c2b2debb64c0968df48
Author: Alex Ott <al...@gmail.com>
AuthorDate: Sun Apr 10 20:31:29 2022 +0200
Databricks: Correctly handle HTTP exception (#22885)
Exception for non-existent repo wasn't correctly handled for Databricks
Repos operations
---
airflow/providers/databricks/hooks/databricks.py | 20 +++++++++++++-------
.../providers/databricks/hooks/databricks_base.py | 14 ++++++++++++--
2 files changed, 25 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py
index ffa77570d5..7911660412 100644
--- a/airflow/providers/databricks/hooks/databricks.py
+++ b/airflow/providers/databricks/hooks/databricks.py
@@ -27,6 +27,8 @@ or the ``api/2.1/jobs/runs/submit``
"""
from typing import Any, Dict, List, Optional
+from requests import exceptions as requests_exceptions
+
from airflow.exceptions import AirflowException
from airflow.providers.databricks.hooks.databricks_base import BaseDatabricksHook
@@ -364,12 +366,16 @@ class DatabricksHook(BaseDatabricksHook):
def get_repo_by_path(self, path: str) -> Optional[str]:
"""
-
- :param path:
- :return:
- """
- result = self._do_api_call(WORKSPACE_GET_STATUS_ENDPOINT, {'path': path})
- if result.get('object_type', '') == 'REPO':
- return str(result['object_id'])
+ Obtains Repos ID by path
+ :param path: path to a repository
+ :return: Repos ID if it exists, None if doesn't.
+ """
+ try:
+ result = self._do_api_call(WORKSPACE_GET_STATUS_ENDPOINT, {'path': path}, wrap_http_errors=False)
+ if result.get('object_type', '') == 'REPO':
+ return str(result['object_id'])
+ except requests_exceptions.HTTPError as e:
+ if e.response.status_code != 404:
+ raise e
return None
diff --git a/airflow/providers/databricks/hooks/databricks_base.py b/airflow/providers/databricks/hooks/databricks_base.py
index 1a418fd04e..6e0f1b44d8 100644
--- a/airflow/providers/databricks/hooks/databricks_base.py
+++ b/airflow/providers/databricks/hooks/databricks_base.py
@@ -307,7 +307,12 @@ class BaseDatabricksHook(BaseHook):
def _log_request_error(self, attempt_num: int, error: str) -> None:
self.log.error('Attempt %s API Request to Databricks failed with reason: %s', attempt_num, error)
- def _do_api_call(self, endpoint_info: Tuple[str, str], json: Optional[Dict[str, Any]] = None):
+ def _do_api_call(
+ self,
+ endpoint_info: Tuple[str, str],
+ json: Optional[Dict[str, Any]] = None,
+ wrap_http_errors: bool = True,
+ ):
"""
Utility function to perform an API call with retries
@@ -362,7 +367,12 @@ class BaseDatabricksHook(BaseHook):
except RetryError:
raise AirflowException(f'API requests to Databricks failed {self.retry_limit} times. Giving up.')
except requests_exceptions.HTTPError as e:
- raise AirflowException(f'Response: {e.response.content}, Status Code: {e.response.status_code}')
+ if wrap_http_errors:
+ raise AirflowException(
+ f'Response: {e.response.content}, Status Code: {e.response.status_code}'
+ )
+ else:
+ raise e
@staticmethod
def _get_error_code(exception: BaseException) -> str: