You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/15 16:06:47 UTC

[airflow] 11/28: Get Airflow configs with sensitive data from Secret Backends (#9645)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 313f61e0007314769cabdebbf98d9d8aa63bfe26
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Wed Jul 8 13:29:54 2020 +0100

    Get Airflow configs with sensitive data from Secret Backends (#9645)
    
    (cherry picked from commit 2f31b3060ed8274d5d1b1db7349ce607640b9199)
---
 airflow/configuration.py                       | 73 ++++++++++++++++++++++----
 airflow/contrib/secrets/aws_secrets_manager.py | 21 +++++++-
 airflow/contrib/secrets/hashicorp_vault.py     | 22 +++++++-
 airflow/secrets/__init__.py                    | 43 +++++++++------
 airflow/secrets/base_secrets.py                | 16 ++++--
 airflow/secrets/metastore.py                   |  5 +-
 docs/howto/set-config.rst                      | 29 ++++++++--
 tests/test_configuration.py                    | 50 ++++++++++++++++--
 8 files changed, 215 insertions(+), 44 deletions(-)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index 8720d77..290843f 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -102,6 +102,15 @@ def run_command(command):
     return output
 
 
+def _get_config_value_from_secret_backend(config_key):
+    """Get Config option values from Secret Backend"""
+    from airflow import secrets
+    secrets_client = secrets.get_custom_secret_backend()
+    if not secrets_client:
+        return None
+    return secrets_client.get_config(config_key)
+
+
 def _read_default_config_file(file_name):
     templates_dir = os.path.join(os.path.dirname(__file__), 'config_templates')
     file_path = os.path.join(templates_dir, file_name)
@@ -136,7 +145,9 @@ class AirflowConfigParser(ConfigParser):
     # These configuration elements can be fetched as the stdout of commands
     # following the "{section}__{name}__cmd" pattern, the idea behind this
     # is to not store password on boxes in text files.
-    as_command_stdout = {
+    # These configs can also be fetched from Secrets backend
+    # following the "{section}__{name}__secret" pattern
+    sensitive_config_values = {
         ('core', 'sql_alchemy_conn'),
         ('core', 'fernet_key'),
         ('celery', 'broker_url'),
@@ -275,19 +286,32 @@ class AirflowConfigParser(ConfigParser):
         env_var_cmd = env_var + '_CMD'
         if env_var_cmd in os.environ:
             # if this is a valid command key...
-            if (section, key) in self.as_command_stdout:
+            if (section, key) in self.sensitive_config_values:
                 return run_command(os.environ[env_var_cmd])
+        # alternatively AIRFLOW__{SECTION}__{KEY}_SECRET (to get from Secrets Backend)
+        env_var_secret_path = env_var + '_SECRET'
+        if env_var_secret_path in os.environ:
+            # if this is a valid secret path...
+            if (section, key) in self.sensitive_config_values:
+                return _get_config_value_from_secret_backend(os.environ[env_var_secret_path])
 
     def _get_cmd_option(self, section, key):
         fallback_key = key + '_cmd'
         # if this is a valid command key...
-        if (section, key) in self.as_command_stdout:
-            if super(AirflowConfigParser, self) \
-                    .has_option(section, fallback_key):
-                command = super(AirflowConfigParser, self) \
-                    .get(section, fallback_key)
+        if (section, key) in self.sensitive_config_values:
+            if super(AirflowConfigParser, self).has_option(section, fallback_key):
+                command = super(AirflowConfigParser, self).get(section, fallback_key)
                 return run_command(command)
 
+    def _get_secret_option(self, section, key):
+        """Get Config option values from Secret Backend"""
+        fallback_key = key + '_secret'
+        # if this is a valid secret key...
+        if (section, key) in self.sensitive_config_values:
+            if super(AirflowConfigParser, self).has_option(section, fallback_key):
+                secrets_path = super(AirflowConfigParser, self).get(section, fallback_key)
+                return _get_config_value_from_secret_backend(secrets_path)
+
     def get(self, section, key, **kwargs):
         section = str(section).lower()
         key = str(key).lower()
@@ -329,6 +353,16 @@ class AirflowConfigParser(ConfigParser):
                 self._warn_deprecate(section, key, deprecated_name)
                 return option
 
+        # ...then from secret backends
+        option = self._get_secret_option(section, key)
+        if option:
+            return option
+        if deprecated_name:
+            option = self._get_secret_option(section, deprecated_name)
+            if option:
+                self._warn_deprecate(section, key, deprecated_name)
+                return option
+
         # ...then the default config
         if self.airflow_defaults.has_option(section, key) or 'fallback' in kwargs:
             return expand_env_var(
@@ -466,7 +500,8 @@ class AirflowConfigParser(ConfigParser):
 
     def as_dict(
             self, display_source=False, display_sensitive=False, raw=False,
-            include_env=True, include_cmds=True):
+            include_env=True, include_cmds=True, include_secret=True
+    ):
         """
         Returns the current configuration as an OrderedDict of OrderedDicts.
 
@@ -488,6 +523,12 @@ class AirflowConfigParser(ConfigParser):
             set (True, default), or should the _cmd options be left as the
             command to run (False)
         :type include_cmds: bool
+        :param include_secret: Should the result of calling any *_secret config be
+            set (True, default), or should the _secret options be left as the
+            path to get the secret from (False)
+        :type include_secret: bool
+        :return: Dictionary, where the key is the name of the section and the content is
+            the dictionary with the name of the parameter and its value.
         """
         cfg = {}
         configs = [
@@ -529,7 +570,7 @@ class AirflowConfigParser(ConfigParser):
 
         # add bash commands
         if include_cmds:
-            for (section, key) in self.as_command_stdout:
+            for (section, key) in self.sensitive_config_values:
                 opt = self._get_cmd_option(section, key)
                 if opt:
                     if not display_sensitive:
@@ -541,6 +582,20 @@ class AirflowConfigParser(ConfigParser):
                     cfg.setdefault(section, OrderedDict()).update({key: opt})
                     del cfg[section][key + '_cmd']
 
+        # add config from secret backends
+        if include_secret:
+            for (section, key) in self.sensitive_config_values:
+                opt = self._get_secret_option(section, key)
+                if opt:
+                    if not display_sensitive:
+                        opt = '< hidden >'
+                    if display_source:
+                        opt = (opt, 'secret')
+                    elif raw:
+                        opt = opt.replace('%', '%%')
+                    cfg.setdefault(section, OrderedDict()).update({key: opt})
+                    del cfg[section][key + '_secret']
+
         return cfg
 
     def load_test_config(self):
diff --git a/airflow/contrib/secrets/aws_secrets_manager.py b/airflow/contrib/secrets/aws_secrets_manager.py
index 7896dac..4df9bdb 100644
--- a/airflow/contrib/secrets/aws_secrets_manager.py
+++ b/airflow/contrib/secrets/aws_secrets_manager.py
@@ -42,8 +42,11 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
 
     For example, if secrets prefix is ``airflow/connections/smtp_default``, this would be accessible
     if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``.
-    And if variables prefix is ``airflow/variables/hello``, this would be accessible
+    If variables prefix is ``airflow/variables/hello``, this would be accessible
     if you provide ``{"variables_prefix": "airflow/variables"}`` and request variable key ``hello``.
+    And if config_prefix is ``airflow/config/sql_alchemy_conn``, this would be accessible
+    if you provide ``{"config_prefix": "airflow/config"}`` and request config
+    key ``sql_alchemy_conn``.
 
     You can also pass additional keyword arguments like ``aws_secret_access_key``, ``aws_access_key_id``
     or ``region_name`` to this class and they would be passed on to Boto3 client.
@@ -52,6 +55,8 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
     :type connections_prefix: str
     :param variables_prefix: Specifies the prefix of the secret to read to get Variables.
     :type variables_prefix: str
+    :param config_prefix: Specifies the prefix of the secret to read to get Variables.
+    :type config_prefix: str
     :param profile_name: The name of a profile to use. If not given, then the default profile is used.
     :type profile_name: str
     :param sep: separator used to concatenate secret_prefix and secret_id. Default: "/"
@@ -62,6 +67,7 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
         self,
         connections_prefix='airflow/connections',  # type: str
         variables_prefix='airflow/variables',  # type: str
+        config_prefix='airflow/config',  # type: str
         profile_name=None,  # type: Optional[str]
         sep="/",  # type: str
         **kwargs
@@ -69,6 +75,7 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
         super(SecretsManagerBackend, self).__init__(**kwargs)
         self.connections_prefix = connections_prefix.rstrip("/")
         self.variables_prefix = variables_prefix.rstrip('/')
+        self.config_prefix = config_prefix.rstrip('/')
         self.profile_name = profile_name
         self.sep = sep
         self.kwargs = kwargs
@@ -96,13 +103,23 @@ class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin):
     def get_variable(self, key):
         # type: (str) -> Optional[str]
         """
-        Get Airflow Variable from Environment Variable
+        Get Airflow Variable
 
         :param key: Variable Key
         :return: Variable Value
         """
         return self._get_secret(self.variables_prefix, key)
 
+    def get_config(self, key):
+        # type: (str) -> Optional[str]
+        """
+        Get Airflow Configuration
+
+        :param key: Configuration Option Key
+        :return: Configuration Option Value
+        """
+        return self._get_secret(self.config_prefix, key)
+
     def _get_secret(self, path_prefix, secret_id):
         # type: (str, str) -> Optional[str]
         """
diff --git a/airflow/contrib/secrets/hashicorp_vault.py b/airflow/contrib/secrets/hashicorp_vault.py
index 8f6b53c..536e7f9 100644
--- a/airflow/contrib/secrets/hashicorp_vault.py
+++ b/airflow/contrib/secrets/hashicorp_vault.py
@@ -55,6 +55,9 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
     :param variables_path: Specifies the path of the secret to read to get Variables.
         (default: 'variables')
     :type variables_path: str
+    :param config_path: Specifies the path of the secret to read Airflow Configurations
+        (default: 'configs').
+    :type config_path: str
     :param url: Base URL for the Vault instance being addressed.
     :type url: str
     :param auth_type: Authentication Type for Vault (one of 'token', 'ldap', 'userpass', 'approle',
@@ -89,6 +92,7 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
         self,
         connections_path='connections',  # type: str
         variables_path='variables',  # type: str
+        config_path='config',  # type: str
         url=None,  # type: Optional[str]
         auth_type='token',  # type: str
         mount_point='secret',  # type: str
@@ -104,9 +108,10 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
         gcp_scopes=None,  # type: Optional[str]
         **kwargs
     ):
-        super(VaultBackend, self).__init__(**kwargs)
+        super(VaultBackend, self).__init__()
         self.connections_path = connections_path.rstrip('/')
         self.variables_path = variables_path.rstrip('/')
+        self.config_path = config_path.rstrip('/')
         self.url = url
         self.auth_type = auth_type
         self.kwargs = kwargs
@@ -179,7 +184,7 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
     def get_variable(self, key):
         # type: (str) -> Optional[str]
         """
-        Get Airflow Variable from Environment Variable
+        Get Airflow Variable
 
         :param key: Variable Key
         :return: Variable Value
@@ -213,3 +218,16 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
 
         return_data = response["data"] if self.kv_engine_version == 1 else response["data"]["data"]
         return return_data
+
+    def get_config(self, key):
+        # type: (str) -> Optional[str]
+        """
+        Get Airflow Configuration
+
+        :param key: Configuration Option Key
+        :type key: str
+        :rtype: str
+        :return: Configuration Option Value retrieved from the vault
+        """
+        response = self._get_secret(self.config_path, key)
+        return response.get("value") if response else None
diff --git a/airflow/secrets/__init__.py b/airflow/secrets/__init__.py
index 57dde4b..8736ac6 100644
--- a/airflow/secrets/__init__.py
+++ b/airflow/secrets/__init__.py
@@ -22,17 +22,20 @@ Secrets framework provides means of getting connection objects from various sour
     * Metatsore database
     * AWS SSM Parameter store
 """
-__all__ = ['BaseSecretsBackend', 'get_connections', 'get_variable']
+__all__ = ['BaseSecretsBackend', 'get_connections', 'get_variable', 'get_custom_secret_backend']
 
 import json
-from typing import List, Optional
+from typing import TYPE_CHECKING, List, Optional
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models.connection import Connection
 from airflow.secrets.base_secrets import BaseSecretsBackend
 from airflow.utils.module_loading import import_string
 
+if TYPE_CHECKING:
+    from airflow.models.connection import Connection
+
+
 CONFIG_SECTION = "secrets"
 DEFAULT_SECRETS_SEARCH_PATH = [
     "airflow.secrets.environment_variables.EnvironmentVariablesBackend",
@@ -41,7 +44,7 @@ DEFAULT_SECRETS_SEARCH_PATH = [
 
 
 def get_connections(conn_id):
-    # type: (str) -> List[Connection]
+    # type: (str) -> List['Connection']
     """
     Get all connections as an iterable.
 
@@ -72,25 +75,35 @@ def get_variable(key):
     return None
 
 
+def get_custom_secret_backend():
+    # type: (...) -> Optional[BaseSecretsBackend]
+    """Get Secret Backend if defined in airflow.cfg"""
+    alternative_secrets_backend = conf.get(section=CONFIG_SECTION, key='backend', fallback='')
+
+    if alternative_secrets_backend:
+        try:
+            alternative_secrets_config_dict = json.loads(
+                conf.get(section=CONFIG_SECTION, key='backend_kwargs', fallback='{}')
+            )
+        except ValueError:
+            alternative_secrets_config_dict = {}
+        secrets_backend_cls = import_string(alternative_secrets_backend)
+        return secrets_backend_cls(**alternative_secrets_config_dict)
+    return None
+
+
 def initialize_secrets_backends():
     # type: (...) -> List[BaseSecretsBackend]
     """
     * import secrets backend classes
     * instantiate them and return them in a list
     """
-    alternative_secrets_backend = conf.get(section=CONFIG_SECTION, key='backend', fallback='')
-    try:
-        alternative_secrets_config_dict = json.loads(
-            conf.get(section=CONFIG_SECTION, key='backend_kwargs', fallback='{}')
-        )
-    except ValueError:
-        alternative_secrets_config_dict = {}
-
     backend_list = []
 
-    if alternative_secrets_backend:
-        secrets_backend_cls = import_string(alternative_secrets_backend)
-        backend_list.append(secrets_backend_cls(**alternative_secrets_config_dict))
+    custom_secret_backend = get_custom_secret_backend()
+
+    if custom_secret_backend is not None:
+        backend_list.append(custom_secret_backend)
 
     for class_name in DEFAULT_SECRETS_SEARCH_PATH:
         secrets_backend_cls = import_string(class_name)
diff --git a/airflow/secrets/base_secrets.py b/airflow/secrets/base_secrets.py
index 2394f40..a8c0e6b 100644
--- a/airflow/secrets/base_secrets.py
+++ b/airflow/secrets/base_secrets.py
@@ -16,9 +16,7 @@
 # under the License.
 
 from abc import ABCMeta
-from typing import List, Optional
-
-from airflow.models.connection import Connection
+from typing import Optional
 
 
 class BaseSecretsBackend:
@@ -56,13 +54,13 @@ class BaseSecretsBackend:
         raise NotImplementedError()
 
     def get_connections(self, conn_id):
-        # type: (str) -> List[Connection]
         """
         Get connections with a specific ID
 
         :param conn_id: connection id
         :type conn_id: str
         """
+        from airflow.models.connection import Connection
         conn_uri = self.get_conn_uri(conn_id=conn_id)
         if not conn_uri:
             return []
@@ -78,3 +76,13 @@ class BaseSecretsBackend:
         :return: Variable Value
         """
         raise NotImplementedError()
+
+    def get_config(self, key):    # pylint: disable=unused-argument
+        # type: (str) -> Optional[str]
+        """
+        Return value for Airflow Config Key
+
+        :param key: Config Key
+        :return: Config Value
+        """
+        return None
diff --git a/airflow/secrets/metastore.py b/airflow/secrets/metastore.py
index 51d8740..f1412e9 100644
--- a/airflow/secrets/metastore.py
+++ b/airflow/secrets/metastore.py
@@ -19,9 +19,6 @@
 Objects relating to sourcing connections from metastore database
 """
 
-from typing import List
-
-from airflow.models.connection import Connection
 from airflow.secrets import BaseSecretsBackend
 from airflow.utils.db import provide_session
 
@@ -34,7 +31,7 @@ class MetastoreBackend(BaseSecretsBackend):
     # pylint: disable=missing-docstring
     @provide_session
     def get_connections(self, conn_id, session=None):
-        # type: (...) -> List[Connection]
+        from airflow.models.connection import Connection
         conn_list = session.query(Connection).filter(Connection.conn_id == conn_id).all()
         session.expunge_all()
         return conn_list
diff --git a/docs/howto/set-config.rst b/docs/howto/set-config.rst
index 035bc29..270fd7f 100644
--- a/docs/howto/set-config.rst
+++ b/docs/howto/set-config.rst
@@ -47,7 +47,21 @@ the key like this:
     [core]
     sql_alchemy_conn_cmd = bash_command_to_run
 
-The following config options support this ``_cmd`` version:
+You can also derive the connection string at run time by appending ``_secret`` to
+the key like this:
+
+.. code-block:: ini
+
+    [core]
+    sql_alchemy_conn_secret = sql_alchemy_conn
+    # You can also add a nested path
+    # example:
+    # sql_alchemy_conn_secret = core/sql_alchemy_conn
+
+This will retrieve config option from Secret Backends e.g Hashicorp Vault. See
+:ref:`Secrets Backends<secrets_backend_configuration>` for more details.
+
+The following config options support this ``_cmd`` and ``_secret`` version:
 
 * ``sql_alchemy_conn`` in ``[core]`` section
 * ``fernet_key`` in ``[core]`` section
@@ -66,12 +80,21 @@ the same way the usual config options can. For example:
 
     export AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD=bash_command_to_run
 
+Similarly, ``_secret`` config options can also be set using a corresponding environment variable.
+For example:
+
+.. code-block:: bash
+
+    export AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET=sql_alchemy_conn
+
 The idea behind this is to not store passwords on boxes in plain text files.
 
 The universal order of precedence for all configuration options is as follows:
 
-#. set as an environment variable
-#. set as a command environment variable
+#. set as an environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN``)
+#. set as a command environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD``)
+#. set as a secret environment variable (``AIRFLOW__CORE__SQL_ALCHEMY_CONN_SECRET``)
 #. set in ``airflow.cfg``
 #. command in ``airflow.cfg``
+#. secret key in ``airflow.cfg``
 #. Airflow's built in defaults
diff --git a/tests/test_configuration.py b/tests/test_configuration.py
index 5c40cad..fcf7964 100644
--- a/tests/test_configuration.py
+++ b/tests/test_configuration.py
@@ -191,7 +191,7 @@ key6 = value6
         test_conf = AirflowConfigParser(
             default_config=parameterized_config(TEST_CONFIG_DEFAULT))
         test_conf.read_string(TEST_CONFIG)
-        test_conf.as_command_stdout = test_conf.as_command_stdout | {
+        test_conf.sensitive_config_values = test_conf.sensitive_config_values | {
             ('test', 'key2'),
             ('test', 'key4'),
         }
@@ -225,6 +225,46 @@ key6 = value6
         self.assertNotIn('key4', cfg_dict['test'])
         self.assertEqual('printf key4_result', cfg_dict['test']['key4_cmd'])
 
+    @mock.patch("airflow.contrib.secrets.hashicorp_vault.hvac")
+    @conf_vars({
+        ("secrets", "backend"): "airflow.contrib.secrets.hashicorp_vault.VaultBackend",
+        ("secrets", "backend_kwargs"): '{"url": "http://127.0.0.1:8200", "token": "token"}',
+    })
+    def test_config_from_secret_backend(self, mock_hvac):
+        """Get Config Value from a Secret Backend"""
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+        mock_client.secrets.kv.v2.read_secret_version.return_value = {
+            'request_id': '2d48a2ad-6bcb-e5b6-429d-da35fdf31f56',
+            'lease_id': '',
+            'renewable': False,
+            'lease_duration': 0,
+            'data': {'data': {'value': 'sqlite:////Users/airflow/airflow/airflow.db'},
+                     'metadata': {'created_time': '2020-03-28T02:10:54.301784Z',
+                                  'deletion_time': '',
+                                  'destroyed': False,
+                                  'version': 1}},
+            'wrap_info': None,
+            'warnings': None,
+            'auth': None
+        }
+
+        test_config = '''[test]
+sql_alchemy_conn_secret = sql_alchemy_conn
+'''
+        test_config_default = '''[test]
+sql_alchemy_conn = airflow
+'''
+
+        test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
+        test_conf.read_string(test_config)
+        test_conf.sensitive_config_values = test_conf.sensitive_config_values | {
+            ('test', 'sql_alchemy_conn'),
+        }
+
+        self.assertEqual(
+            'sqlite:////Users/airflow/airflow/airflow.db', test_conf.get('test', 'sql_alchemy_conn'))
+
     def test_getboolean(self):
         """Test AirflowConfigParser.getboolean"""
         TEST_CONFIG = """
@@ -410,7 +450,7 @@ AIRFLOW_HOME = /root/airflow
         # Guarantee we have a deprecated setting, so we test the deprecation
         # lookup even if we remove this explicit fallback
         conf.deprecated_options['celery'] = {'result_backend': 'celery_result_backend'}
-        conf.as_command_stdout.add(('celery', 'celery_result_backend'))
+        conf.sensitive_config_values.add(('celery', 'celery_result_backend'))
 
         conf.remove_option('celery', 'result_backend')
         conf.set('celery', 'celery_result_backend_cmd', '/bin/echo 99')
@@ -478,13 +518,13 @@ notacommand = OK
 '''
         test_cmdenv_conf = AirflowConfigParser()
         test_cmdenv_conf.read_string(TEST_CMDENV_CONFIG)
-        test_cmdenv_conf.as_command_stdout.add(('testcmdenv', 'itsacommand'))
+        test_cmdenv_conf.sensitive_config_values.add(('testcmdenv', 'itsacommand'))
         with mock.patch.dict('os.environ'):
             # AIRFLOW__TESTCMDENV__ITSACOMMAND_CMD maps to ('testcmdenv', 'itsacommand') in
-            # as_command_stdout and therefore should return 'OK' from the environment variable's
+            # sensitive_config_values and therefore should return 'OK' from the environment variable's
             # echo command, and must not return 'NOT OK' from the configuration
             self.assertEqual(test_cmdenv_conf.get('testcmdenv', 'itsacommand'), 'OK')
-            # AIRFLOW__TESTCMDENV__NOTACOMMAND_CMD maps to no entry in as_command_stdout and therefore
+            # AIRFLOW__TESTCMDENV__NOTACOMMAND_CMD maps to no entry in sensitive_config_values and therefore
             # the option should return 'OK' from the configuration, and must not return 'NOT OK' from
             # the environement variable's echo command
             self.assertEqual(test_cmdenv_conf.get('testcmdenv', 'notacommand'), 'OK')