You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/08/27 20:59:31 UTC

[airflow] branch main updated: Secrets backend failover (#16404)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0abbd2d  Secrets backend failover (#16404)
0abbd2d is described below

commit 0abbd2d918ad9027948fd8a33ebb42487e4aa000
Author: Faisal <fh...@users.noreply.github.com>
AuthorDate: Fri Aug 27 15:59:15 2021 -0500

    Secrets backend failover (#16404)
    
    Currently Airflow does not check the default secrets backends (env and metastore db) if there is any sort of connection related error to an Alternative Backend, causing related tasks to fail. The change proposed here allows it to fail over to checking the default backends when this happens.
    
    Additionally GCP Secrets Manager causes the Airflow Webserver to crash at startup if credentials for the backend cannot be found. This behavior seems to be unique to GCP Secret Manager and this PR addresses that for parity in behavior regarding missing credentials across all backends.
    
    closes: #14592
---
 airflow/configuration.py         | 16 ++++++++++++----
 airflow/models/connection.py     | 17 ++++++++++++++---
 airflow/models/variable.py       | 15 +++++++++++----
 tests/core/test_configuration.py | 37 +++++++++++++++++++++++++++++++++++++
 4 files changed, 74 insertions(+), 11 deletions(-)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index 23767cb..8897545 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -81,10 +81,18 @@ def run_command(command):
 
 def _get_config_value_from_secret_backend(config_key):
     """Get Config option values from Secret Backend"""
-    secrets_client = get_custom_secret_backend()
-    if not secrets_client:
-        return None
-    return secrets_client.get_config(config_key)
+    try:
+        secrets_client = get_custom_secret_backend()
+        if not secrets_client:
+            return None
+        return secrets_client.get_config(config_key)
+    except Exception as e:  # pylint: disable=broad-except
+        raise AirflowConfigException(
+            'Cannot retrieve config from alternative secrets backend. '
+            'Make sure it is configured properly and that the Backend '
+            'is accessible.\n'
+            f'{e}'
+        )
 
 
 def _default_config_file_path(file_name: str):
diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index e74a70f..027e59a 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -17,6 +17,7 @@
 # under the License.
 
 import json
+import logging
 import warnings
 from json import JSONDecodeError
 from typing import Dict, Optional, Union
@@ -35,6 +36,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.log.secrets_masker import mask_secret
 from airflow.utils.module_loading import import_string
 
+log = logging.getLogger(__name__)
+
 
 def parse_netloc_to_hostname(*args, **kwargs):
     """This method is deprecated."""
@@ -393,7 +396,15 @@ class Connection(Base, LoggingMixin):
         :return: connection
         """
         for secrets_backend in ensure_secrets_loaded():
-            conn = secrets_backend.get_connection(conn_id=conn_id)
-            if conn:
-                return conn
+            try:
+                conn = secrets_backend.get_connection(conn_id=conn_id)
+                if conn:
+                    return conn
+            except Exception:  # pylint: disable=broad-except
+                log.exception(
+                    'Unable to retrieve connection from secrets backend (%s). '
+                    'Checking subsequent secrets backend.',
+                    type(secrets_backend).__name__,
+                )
+
         raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined")
diff --git a/airflow/models/variable.py b/airflow/models/variable.py
index 7d47269..b11194c 100644
--- a/airflow/models/variable.py
+++ b/airflow/models/variable.py
@@ -33,7 +33,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.log.secrets_masker import mask_secret
 from airflow.utils.session import provide_session
 
-log = logging.getLogger()
+log = logging.getLogger(__name__)
 
 
 class Variable(Base, LoggingMixin):
@@ -201,7 +201,14 @@ class Variable(Base, LoggingMixin):
         :return: Variable Value
         """
         for secrets_backend in ensure_secrets_loaded():
-            var_val = secrets_backend.get_variable(key=key)
-            if var_val is not None:
-                return var_val
+            try:
+                var_val = secrets_backend.get_variable(key=key)
+                if var_val is not None:
+                    return var_val
+            except Exception:  # pylint: disable=broad-except
+                log.exception(
+                    'Unable to retrieve variable from secrets backend (%s). '
+                    'Checking subsequent secrets backend.',
+                    type(secrets_backend).__name__,
+                )
         return None
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index b356ba6..470e63f 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -251,6 +251,43 @@ sql_alchemy_conn = airflow
 
         assert 'sqlite:////Users/airflow/airflow/airflow.db' == test_conf.get('test', 'sql_alchemy_conn')
 
+    @mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    @conf_vars(
+        {
+            ("secrets", "backend"): "airflow.providers.hashicorp.secrets.vault.VaultBackend",
+            ("secrets", "backend_kwargs"): '{"url": "http://127.0.0.1:8200", "token": "token"}',
+        }
+    )
+    def test_config_raise_exception_from_secret_backend_connection_error(self, mock_hvac):
+        """Get Config Value from a Secret Backend"""
+
+        mock_client = mock.MagicMock()
+        # mock_client.side_effect = AirflowConfigException
+        mock_hvac.Client.return_value = mock_client
+        mock_client.secrets.kv.v2.read_secret_version.return_value = Exception
+
+        test_config = '''[test]
+sql_alchemy_conn_secret = sql_alchemy_conn
+'''
+        test_config_default = '''[test]
+sql_alchemy_conn = airflow
+'''
+        test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
+        test_conf.read_string(test_config)
+        test_conf.sensitive_config_values = test_conf.sensitive_config_values | {
+            ('test', 'sql_alchemy_conn'),
+        }
+
+        with pytest.raises(
+            AirflowConfigException,
+            match=re.escape(
+                'Cannot retrieve config from alternative secrets backend. '
+                'Make sure it is configured properly and that the Backend '
+                'is accessible.'
+            ),
+        ):
+            test_conf.get('test', 'sql_alchemy_conn')
+
     def test_getboolean(self):
         """Test AirflowConfigParser.getboolean"""
         test_config = """