You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/15 16:06:47 UTC
[airflow] 11/28: Get Airflow configs with sensitive data from
Secret Backends (#9645)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 313f61e0007314769cabdebbf98d9d8aa63bfe26
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Wed Jul 8 13:29:54 2020 +0100
Get Airflow configs with sensitive data from Secret Backends (#9645)
(cherry picked from commit 2f31b3060ed8274d5d1b1db7349ce607640b9199)
---
airflow/configuration.py | 73 ++++++++++++++++++++++----
airflow/contrib/secrets/aws_secrets_manager.py | 21 +++++++-
airflow/contrib/secrets/hashicorp_vault.py | 22 +++++++-
airflow/secrets/__init__.py | 43 +++++++++------
airflow/secrets/base_secrets.py | 16 ++++--
airflow/secrets/metastore.py | 5 +-
docs/howto/set-config.rst | 29 ++++++++--
tests/test_configuration.py | 50 ++++++++++++++++--
8 files changed, 215 insertions(+), 44 deletions(-)
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 8720d77..290843f 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -102,6 +102,15 @@ def run_command(command):
return output
+def _get_config_value_from_secret_backend(config_key):
+ """Get Config option values from Secret Backend"""
+ from airflow import secrets
+ secrets_client = secrets.get_custom_secret_backend()
+ if not secrets_client:
+ return None
+ return secrets_client.get_config(config_key)
+
+
def _read_default_config_file(file_name):
templates_dir = os.path.join(os.path.dirname(__file__), 'config_templates')
file_path = os.path.join(templates_dir, file_name)
@@ -136,7 +145,9 @@ class AirflowConfigParser(ConfigParser):
# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}__cmd" pattern, the idea behind this
# is to not store password on boxes in text files.
- as_command_stdout = {
+ # These configs can also be fetched from Secrets backend
+ # following the "{section}__{name}__secret" pattern
+ sensitive_config_values = {
('core', 'sql_alchemy_conn'),
('core', 'fernet_key'),
('celery', 'broker_url'),
@@ -275,19 +286,32 @@ class AirflowConfigParser(ConfigParser):
env_var_cmd = env_var + '_CMD'
if env_var_cmd in os.environ:
# if this is a valid command key...
- if (section, key) in self.as_command_stdout:
+ if (section, key) in self.sensitive_config_values:
return run_command(os.environ[env_var_cmd])
+ # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
+ env_var_secret_path = env_var + '_SECRET'
+ if env_var_secret_path in os.environ:
+ # if this is a valid secret path...
+ if (section, key) in self.sensitive_config_values:
+ return _get_config_value_from_secret_backend(os.environ[env_var_secret_path])
def _get_cmd_option(self, section, key):
fallback_key = key + '_cmd'
# if this is a valid command key...
- if (section, key) in self.as_command_stdout:
- if super(AirflowConfigParser, self) \
- .has_option(section, fallback_key):
- command = super(AirflowConfigParser, self) \
- .get(section, fallback_key)
+ if (section, key) in self.sensitive_config_values:
+ if super(AirflowConfigParser, self).has_option(section, fallback_key):
+ command = super(AirflowConfigParser, self).get(section, fallback_key)
return run_command(command)
+ def _get_secret_option(self, section, key):
+ """Get Config option values from Secret Backend"""
+ fallback_key = key + '_secret'
+ # if this is a valid secret key...
+ if (section, key) in self.sensitive_config_values:
+ if super(AirflowConfigParser, self).has_option(section, fallback_key):
+ secrets_path = super(AirflowConfigParser, self).get(section, fallback_key)
+ return _get_config_value_from_secret_backend(secrets_path)
+
def get(self, section, key, **kwargs):
section = str(section).lower()
key = str(key).lower()
@@ -329,6 +353,16 @@ class AirflowConfigParser(ConfigParser):
self._warn_deprecate(section, key, deprecated_name)
return option
+ # ...then from secret backends
+ option = self._get_secret_option(section, key)
+ if option:
+ return option
+ if deprecated_name:
+ option = self._get_secret_option(section, deprecated_name)
+ if option:
+ self._warn_deprecate(section, key, deprecated_name)
+ return option
+
# ...then the default config
if self.airflow_defaults.has_option(section, key) or 'fallback' in kwargs:
return expand_env_var(
@@ -466,7 +500,8 @@ class AirflowConfigParser(ConfigParser):
def as_dict(
self, display_source=False, display_sensitive=False, raw=False,
- include_env=True, include_cmds=True):
+ include_env=True, include_cmds=True, include_secret=True
+ ):
"""
Returns the current configuration as an OrderedDict of OrderedDicts.
@@ -488,6 +523,12 @@ class AirflowConfigParser(ConfigParser):
set (True, default), or should the _cmd options be left as the
command to run (False)
:type include_cmds: bool
+ :param include_secret: Should the result of calling any *_secret config be
+ set (True, default), or should the _secret options be left as the
+ path to get the secret from (False)
+ :type include_secret: bool
+ :return: Dictionary, where the key is the name of the section and the content is
+ the dictionary with the name of the parameter and its value.
"""
cfg = {}
configs = [
@@ -529,7 +570,7 @@ class AirflowConfigParser(ConfigParser):
# add bash commands
if include_cmds:
- for (section, key) in self.as_command_stdout:
+ for (section, key) in self.sensitive_config_values:
opt = self._get_cmd_option(section, key)
if opt:
if not display_sensitive:
@@ -541,6 +582,20 @@ class AirflowConfigParser(ConfigParser):
cfg.setdefault(section, OrderedDict()).update({key: opt})
del cfg[section][key + '_cmd']
+ # add config from secret backends
+ if include_secret:
+ for (section, key) in self.sensitive_config_values:
+ opt = self._get_secret_option(section, key)
+ if opt:
+ if not display_sensitive:
+ opt = '< hidden >'
+ if display_source:
+ opt = (opt, 'secret')
+ elif raw:
+ opt = opt.replace('%', '%%')
+ cfg.setdefault(section, OrderedDict()).update({key: opt})
+ del cfg[section][key + '_secret']
+
return cfg
def load_test_config(self):
diff --git a/airflow/contrib/secrets/aws_secrets_manager.py b/airflow/contrib/secrets/aws_secrets_manager.py
index 7896dac..4df9bdb 100644
--- a/airflow/contrib/secrets/aws_secrets_manager.py
+++ b/airflow/contrib/secrets/aws_secrets_manager.py
@@ -42,8 +42,11 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
For example, if secrets prefix is ``airflow/connections/smtp_default``, this would be accessible
if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``.
- And if variables prefix is ``airflow/variables/hello``, this would be accessible
+ If variables prefix is ``airflow/variables/hello``, this would be accessible
if you provide ``{"variables_prefix": "airflow/variables"}`` and request variable key ``hello``.
+ And if config_prefix is ``airflow/config/sql_alchemy_conn``, this would be accessible
+ if you provide ``{"config_prefix": "airflow/config"}`` and request config
+ key ``sql_alchemy_conn``.
You can also pass additional keyword arguments like ``aws_secret_access_key``, ``aws_access_key_id``
or ``region_name`` to this class and they would be passed on to Boto3 client.
@@ -52,6 +55,8 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
:type connections_prefix: str
:param variables_prefix: Specifies the prefix of the secret to read to get Variables.
:type variables_prefix: str
+ :param config_prefix: Specifies the prefix of the secret to read to get Variables.
+ :type config_prefix: str
:param profile_name: The name of a profile to use. If not given, then the default profile is used.
:type profile_name: str
:param sep: separator used to concatenate secret_prefix and secret_id. Default: "/"
@@ -62,6 +67,7 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
self,
connections_prefix='airflow/connections', # type: str
variables_prefix='airflow/variables', # type: str
+ config_prefix='airflow/config', # type: str
profile_name=None, # type: Optional[str]
sep="/", # type: str
**kwargs
@@ -69,6 +75,7 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
super(SecretsManagerBackend, self).__init__(**kwargs)
self.connections_prefix = connections_prefix.rstrip("/")
self.variables_prefix = variables_prefix.rstrip('/')
+ self.config_prefix = config_prefix.rstrip('/')
self.profile_name = profile_name
self.sep = sep
self.kwargs = kwargs
@@ -96,13 +103,23 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
def get_variable(self, key):
# type: (str) -> Optional[str]
"""
- Get Airflow Variable from Environment Variable
+ Get Airflow Variable
:param key: Variable Key
:return: Variable Value
"""
return self._get_secret(self.variables_prefix, key)
+ def get_config(self, key):
+ # type: (str) -> Optional[str]
+ """
+ Get Airflow Configuration
+
+ :param key: Configuration Option Key
+ :return: Configuration Option Value
+ """
+ return self._get_secret(self.config_prefix, key)
+
def _get_secret(self, path_prefix, secret_id):
# type: (str, str) -> Optional[str]
"""
diff --git a/airflow/contrib/secrets/hashicorp_vault.py b/airflow/contrib/secrets/hashicorp_vault.py
index 8f6b53c..536e7f9 100644
--- a/airflow/contrib/secrets/hashicorp_vault.py
+++ b/airflow/contrib/secrets/hashicorp_vault.py
@@ -55,6 +55,9 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
:param variables_path: Specifies the path of the secret to read to get Variables.
(default: 'variables')
:type variables_path: str
+ :param config_path: Specifies the path of the secret to read Airflow Configurations
+ (default: 'configs').
+ :type config_path: str
:param url: Base URL for the Vault instance being addressed.
:type url: str
:param auth_type: Authentication Type for Vault (one of 'token', 'ldap', 'userpass', 'approle',
@@ -89,6 +92,7 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
self,
connections_path='connections', # type: str
variables_path='variables', # type: str
+ config_path='config', # type: str
url=None, # type: Optional[str]
auth_type='token', # type: str
mount_point='secret', # type: str
@@ -104,9 +108,10 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
gcp_scopes=None, # type: Optional[str]
**kwargs
):
- super(VaultBackend, self).__init__(**kwargs)
+ super(VaultBackend, self).__init__()
self.connections_path = connections_path.rstrip('/')
self.variables_path = variables_path.rstrip('/')
+ self.config_path = config_path.rstrip('/')
self.url = url
self.auth_type = auth_type
self.kwargs = kwargs
@@ -179,7 +184,7 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
def get_variable(self, key):
# type: (str) -> Optional[str]
"""
- Get Airflow Variable from Environment Variable
+ Get Airflow Variable
:param key: Variable Key
:return: Variable Value
@@ -213,3 +218,16 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
return_data = response["data"] if self.kv_engine_version == 1 else response["data"]["data"]
return return_data
+
+ def get_config(self, key):
+ # type: (str) -> Optional[str]
+ """
+ Get Airflow Configuration
+
+ :param key: Configuration Option Key
+ :type key: str
+ :rtype: str
+ :return: Configuration Option Value retrieved from the vault
+ """
+ response = self._get_secret(self.config_path, key)
+ return response.get("value") if response else None
diff --git a/airflow/secrets/__init__.py b/airflow/secrets/__init__.py
index 57dde4b..8736ac6 100644
--- a/airflow/secrets/__init__.py
+++ b/airflow/secrets/__init__.py
@@ -22,17 +22,20 @@ Secrets framework provides means of getting connection objects from various sour
* Metatsore database
* AWS SSM Parameter store
"""
-__all__ = ['BaseSecretsBackend', 'get_connections', 'get_variable']
+__all__ = ['BaseSecretsBackend', 'get_connections', 'get_variable', 'get_custom_secret_backend']
import json
-from typing import List, Optional
+from typing import TYPE_CHECKING, List, Optional
from airflow.configuration import conf
from airflow.exceptions import AirflowException
-from airflow.models.connection import Connection
from airflow.secrets.base_secrets import BaseSecretsBackend
from airflow.utils.module_loading import import_string
+if TYPE_CHECKING:
+ from airflow.models.connection import Connection
+
+
CONFIG_SECTION = "secrets"
DEFAULT_SECRETS_SEARCH_PATH = [
"airflow.secrets.environment_variables.EnvironmentVariablesBackend",
@@ -41,7 +44,7 @@ DEFAULT_SECRETS_SEARCH_PATH = [
def get_connections(conn_id):
- # type: (str) -> List[Connection]
+ # type: (str) -> List['Connection']
"""
Get all connections as an iterable.
@@ -72,25 +75,35 @@ def get_variable(key):
return None
+def get_custom_secret_backend():
+ # type: (...) -> Optional[BaseSecretsBackend]
+ """Get Secret Backend if defined in airflow.cfg"""
+ alternative_secrets_backend = conf.get(section=CONFIG_SECTION, key='backend', fallback='')
+
+ if alternative_secrets_backend:
+ try:
+ alternative_secrets_config_dict = json.loads(
+ conf.get(section=CONFIG_SECTION, key='backend_kwargs', fallback='{}')
+ )
+ except ValueError:
+ alternative_secrets_config_dict = {}
+ secrets_backend_cls = import_string(alternative_secrets_backend)
+ return secrets_backend_cls(**alternative_secrets_config_dict)
+ return None
+
+
def initialize_secrets_backends():
# type: (...) -> List[BaseSecretsBackend]
"""
* import secrets backend classes
* instantiate them and return them in a list
"""
- alternative_secrets_backend = conf.get(section=CONFIG_SECTION, key='backend', fallback='')
- try:
- alternative_secrets_config_dict = json.loads(
- conf.get(section=CONFIG_SECTION, key='backend_kwargs', fallback='{}')
- )
- except ValueError:
- alternative_secrets_config_dict = {}
-
backend_list = []
- if alternative_secrets_backend:
- secrets_backend_cls = import_string(alternative_secrets_backend)
- backend_list.append(secrets_backend_cls(**alternative_secrets_config_dict))
+ custom_secret_backend = get_custom_secret_backend()
+
+ if custom_secret_backend is not None:
+ backend_list.append(custom_secret_backend)
for class_name in DEFAULT_SECRETS_SEARCH_PATH:
secrets_backend_cls = import_string(class_name)
diff --git a/airflow/secrets/base_secrets.py b/airflow/secrets/base_secrets.py
index 2394f40..a8c0e6b 100644
--- a/airflow/secrets/base_secrets.py
+++ b/airflow/secrets/base_secrets.py
@@ -16,9 +16,7 @@
# under the License.
from abc import ABCMeta
-from typing import List, Optional
-
-from airflow.models.connection import Connection
+from typing import Optional
class BaseSecretsBackend:
@@ -56,13 +54,13 @@ class BaseSecretsBackend:
raise NotImplementedError()
def get_connections(self, conn_id):
- # type: (str) -> List[Connection]
"""
Get connections with a specific ID
:param conn_id: connection id
:type conn_id: str
"""
+ from airflow.models.connection import Connection
conn_uri = self.get_conn_uri(conn_id=conn_id)
if not conn_uri:
return []
@@ -78,3 +76,13 @@ class BaseSecretsBackend:
:return: Variable Value
"""
raise NotImplementedError()
+
+ def get_config(self, key): # pylint: disable=unused-argument
+ # type: (str) -> Optional[str]
+ """
+ Return value for Airflow Config Key
+
+ :param key: Config Key
+ :return: Config Value
+ """
+ return None
diff --git a/airflow/secrets/metastore.py b/airflow/secrets/metastore.py
index 51d8740..f1412e9 100644
--- a/airflow/secrets/metastore.py
+++ b/airflow/secrets/metastore.py
@@ -19,9 +19,6 @@
Objects relating to sourcing connections from metastore database
"""
-from typing import List
-
-from airflow.models.connection import Connection
from airflow.secrets import BaseSecretsBackend
from airflow.utils.db import provide_session
@@ -34,7 +31,7 @@ class MetastoreBackend(BaseSecretsBackend):
# pylint: disable=missing-docstring
@provide_session
def get_connections(self, conn_id, session=None):
- # type: (...) -> List[Connection]
+ from airflow.models.connection import Connection
conn_list = session.query(Connection).filter(Connection.conn_id == conn_id).all()
session.expunge_all()
return conn_list
diff --git a/docs/howto/set-config.rst b/docs/howto/set-config.rst
index 035bc29..270fd7f 100644
--- a/docs/howto/set-config.rst
+++ b/docs/howto/set-config.rst
@@ -47,7 +47,21 @@ the key like this:
[core]
sql_alchemy_conn_cmd = bash_command_to_run
-The following config options support this ``_cmd`` version:
+You can also derive the connection string at run time by appending ``_secret`` to
+the key like this:
+
+.. code-block:: ini
+
+ [core]
+ sql_alchemy_conn_secret = sql_alchemy_conn
+ # You can also add a nested path
+ # example:
+ # sql_alchemy_conn_secret = core/sql_alchemy_conn
+
+This will retrieve config option from Secret Backends e.g Hashicorp Vault. See
+:ref:`Secrets Backends<secrets_backend_configuration>` for more details.
+
+The following config options support this ``_cmd`` and ``_secret`` version:
* ``sql_alchemy_conn`` in ``[core]`` section
* ``fernet_key`` in ``[core]`` section
@@ -66,12 +80,21 @@ the same way the usual config options can. For example:
export AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD=bash_command_to_run
+Similarly, ``_secret`` config options can also be set using a corresponding environment variable.
+For example:
+
+.. code-block:: bash
+
+ export AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET=sql_alchemy_conn
+
The idea behind this is to not store passwords on boxes in plain text files.
The universal order of precedence for all configuration options is as follows:
-#. set as an environment variable
-#. set as a command environment variable
+#. set as an environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN``)
+#. set as a command environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD``)
+#. set as a secret environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET``)
#. set in ``airflow.cfg``
#. command in ``airflow.cfg``
+#. secret key in ``airflow.cfg``
#. Airflow's built in defaults
diff --git a/tests/test_configuration.py b/tests/test_configuration.py
index 5c40cad..fcf7964 100644
--- a/tests/test_configuration.py
+++ b/tests/test_configuration.py
@@ -191,7 +191,7 @@ key6 = value6
test_conf = AirflowConfigParser(
default_config=parameterized_config(TEST_CONFIG_DEFAULT))
test_conf.read_string(TEST_CONFIG)
- test_conf.as_command_stdout = test_conf.as_command_stdout | {
+ test_conf.sensitive_config_values = test_conf.sensitive_config_values | {
('test', 'key2'),
('test', 'key4'),
}
@@ -225,6 +225,46 @@ key6 = value6
self.assertNotIn('key4', cfg_dict['test'])
self.assertEqual('printf key4_result', cfg_dict['test']['key4_cmd'])
+ @mock.patch("airflow.contrib.secrets.hashicorp_vault.hvac")
+ @conf_vars({
+ ("secrets", "backend"): "airflow.contrib.secrets.hashicorp_vault.VaultBackend",
+ ("secrets", "backend_kwargs"): '{"url": "http://127.0.0.1:8200", "token": "token"}',
+ })
+ def test_config_from_secret_backend(self, mock_hvac):
+ """Get Config Value from a Secret Backend"""
+ mock_client = mock.MagicMock()
+ mock_hvac.Client.return_value = mock_client
+ mock_client.secrets.kv.v2.read_secret_version.return_value = {
+ 'request_id': '2d48a2ad-6bcb-e5b6-429d-da35fdf31f56',
+ 'lease_id': '',
+ 'renewable': False,
+ 'lease_duration': 0,
+ 'data': {'data': {'value': 'sqlite:////Users/airflow/airflow/airflow.db'},
+ 'metadata': {'created_time': '2020-03-28T02:10:54.301784Z',
+ 'deletion_time': '',
+ 'destroyed': False,
+ 'version': 1}},
+ 'wrap_info': None,
+ 'warnings': None,
+ 'auth': None
+ }
+
+ test_config = '''[test]
+sql_alchemy_conn_secret = sql_alchemy_conn
+'''
+ test_config_default = '''[test]
+sql_alchemy_conn = airflow
+'''
+
+ test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
+ test_conf.read_string(test_config)
+ test_conf.sensitive_config_values = test_conf.sensitive_config_values | {
+ ('test', 'sql_alchemy_conn'),
+ }
+
+ self.assertEqual(
+ 'sqlite:////Users/airflow/airflow/airflow.db', test_conf.get('test', 'sql_alchemy_conn'))
+
def test_getboolean(self):
"""Test AirflowConfigParser.getboolean"""
TEST_CONFIG = """
@@ -410,7 +450,7 @@ AIRFLOW_HOME = /root/airflow
# Guarantee we have a deprecated setting, so we test the deprecation
# lookup even if we remove this explicit fallback
conf.deprecated_options['celery'] = {'result_backend': 'celery_result_backend'}
- conf.as_command_stdout.add(('celery', 'celery_result_backend'))
+ conf.sensitive_config_values.add(('celery', 'celery_result_backend'))
conf.remove_option('celery', 'result_backend')
conf.set('celery', 'celery_result_backend_cmd', '/bin/echo 99')
@@ -478,13 +518,13 @@ notacommand = OK
'''
test_cmdenv_conf = AirflowConfigParser()
test_cmdenv_conf.read_string(TEST_CMDENV_CONFIG)
- test_cmdenv_conf.as_command_stdout.add(('testcmdenv', 'itsacommand'))
+ test_cmdenv_conf.sensitive_config_values.add(('testcmdenv', 'itsacommand'))
with mock.patch.dict('os.environ'):
# AIRFLOW__TESTCMDENV__ITSACOMMAND_CMD maps to ('testcmdenv', 'itsacommand') in
- # as_command_stdout and therefore should return 'OK' from the environment variable's
+ # sensitive_config_values and therefore should return 'OK' from the environment variable's
# echo command, and must not return 'NOT OK' from the configuration
self.assertEqual(test_cmdenv_conf.get('testcmdenv', 'itsacommand'), 'OK')
- # AIRFLOW__TESTCMDENV__NOTACOMMAND_CMD maps to no entry in as_command_stdout and therefore
+ # AIRFLOW__TESTCMDENV__NOTACOMMAND_CMD maps to no entry in sensitive_config_values and therefore
# the option should return 'OK' from the configuration, and must not return 'NOT OK' from
# the environement variable's echo command
self.assertEqual(test_cmdenv_conf.get('testcmdenv', 'notacommand'), 'OK')