You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/08/24 22:17:29 UTC

[airflow] branch master updated: Make configuration.py Pylint compatible (#10494)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1775474  Make configuration.py Pylint compatible (#10494)
1775474 is described below

commit 1775474484e447568f9ee8e45431ec20f7ae8bd7
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Tue Aug 25 00:16:54 2020 +0200

    Make configuration.py Pylint compatible (#10494)
---
 airflow/configuration.py   | 266 ++++++++++++++++++++++++++++-----------------
 scripts/ci/pylint_todo.txt |   1 -
 2 files changed, 165 insertions(+), 102 deletions(-)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index a30e8b2..cbd281c 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -118,7 +118,7 @@ def default_config_yaml() -> dict:
         return yaml.safe_load(config_file)
 
 
-class AirflowConfigParser(ConfigParser):
+class AirflowConfigParser(ConfigParser):  # pylint: disable=too-many-ancestors
     """Custom Airflow Configparser supporting defaults and deprecated options"""
 
     # These configuration elements can be fetched as the stdout of commands
@@ -243,7 +243,7 @@ class AirflowConfigParser(ConfigParser):
                     "mp_start_method should not be " + mp_start_method +
                     ". Possible values are " + ", ".join(start_method_options))
 
-    def _using_old_value(self, old, current_value):
+    def _using_old_value(self, old, current_value):  # noqa
         return old.search(current_value) is not None
 
     def _update_env_var(self, section, name, new_value):
@@ -286,6 +286,7 @@ class AirflowConfigParser(ConfigParser):
             # if this is a valid secret path...
             if (section, key) in self.sensitive_config_values:
                 return _get_config_value_from_secret_backend(os.environ[env_var_secret_path])
+        return None
 
     def _get_cmd_option(self, section, key):
         fallback_key = key + '_cmd'
@@ -294,6 +295,7 @@ class AirflowConfigParser(ConfigParser):
             if super().has_option(section, fallback_key):
                 command = super().get(section, fallback_key)
                 return run_command(command)
+        return None
 
     def _get_secret_option(self, section, key):
         """Get Config option values from Secret Backend"""
@@ -303,6 +305,7 @@ class AirflowConfigParser(ConfigParser):
             if super().has_option(section, fallback_key):
                 secrets_path = super().get(section, fallback_key)
                 return _get_config_value_from_secret_backend(secrets_path)
+        return None
 
     def get(self, section, key, **kwargs):
         section = str(section).lower()
@@ -310,16 +313,64 @@ class AirflowConfigParser(ConfigParser):
 
         deprecated_section, deprecated_key = self.deprecated_options.get((section, key), (None, None))
 
-        # first check environment variables
-        option = self._get_env_var_option(section, key)
+        option = self._get_environment_variables(deprecated_key, deprecated_section, key, section)
+        if option is not None:
+            return option
+
+        option = self._get_option_from_config_file(deprecated_key, deprecated_section, key, kwargs, section)
+        if option is not None:
+            return option
+
+        option = self._get_option_from_commands(deprecated_key, deprecated_section, key, section)
+        if option is not None:
+            return option
+
+        option = self._get_option_from_secrets(deprecated_key, deprecated_section, key, section)
         if option is not None:
             return option
+
+        return self._get_option_from_default_config(section, key, **kwargs)
+
+    def _get_option_from_default_config(self, section, key, **kwargs):
+        # ...then the default config
+        if self.airflow_defaults.has_option(section, key) or 'fallback' in kwargs:
+            return expand_env_var(
+                self.airflow_defaults.get(section, key, **kwargs))
+
+        else:
+            log.warning(
+                "section/key [%s/%s] not found in config", section, key
+            )
+
+            raise AirflowConfigException(
+                "section/key [{section}/{key}] not found "
+                "in config".format(section=section, key=key))
+
+    def _get_option_from_secrets(self, deprecated_key, deprecated_section, key, section):
+        # ...then from secret backends
+        option = self._get_secret_option(section, key)
+        if option:
+            return option
         if deprecated_section:
-            option = self._get_env_var_option(deprecated_section, deprecated_key)
-            if option is not None:
+            option = self._get_secret_option(deprecated_section, deprecated_key)
+            if option:
                 self._warn_deprecate(section, key, deprecated_section, deprecated_key)
                 return option
+        return None
 
+    def _get_option_from_commands(self, deprecated_key, deprecated_section, key, section):
+        # ...then commands
+        option = self._get_cmd_option(section, key)
+        if option:
+            return option
+        if deprecated_section:
+            option = self._get_cmd_option(deprecated_section, deprecated_key)
+            if option:
+                self._warn_deprecate(section, key, deprecated_section, deprecated_key)
+                return option
+        return None
+
+    def _get_option_from_config_file(self, deprecated_key, deprecated_section, key, kwargs, section):
         # ...then the config file
         if super().has_option(section, key):
             # Use the parent's methods to get the actual config here to be able to
@@ -334,40 +385,19 @@ class AirflowConfigParser(ConfigParser):
                     deprecated_key,
                     **kwargs
                 ))
+        return None
 
-        # ...then commands
-        option = self._get_cmd_option(section, key)
-        if option:
-            return option
-        if deprecated_section:
-            option = self._get_cmd_option(deprecated_section, deprecated_key)
-            if option:
-                self._warn_deprecate(section, key, deprecated_section, deprecated_key)
-                return option
-
-        # ...then from secret backends
-        option = self._get_secret_option(section, key)
-        if option:
+    def _get_environment_variables(self, deprecated_key, deprecated_section, key, section):
+        # first check environment variables
+        option = self._get_env_var_option(section, key)
+        if option is not None:
             return option
         if deprecated_section:
-            option = self._get_secret_option(deprecated_section, deprecated_key)
-            if option:
+            option = self._get_env_var_option(deprecated_section, deprecated_key)
+            if option is not None:
                 self._warn_deprecate(section, key, deprecated_section, deprecated_key)
                 return option
-
-        # ...then the default config
-        if self.airflow_defaults.has_option(section, key) or 'fallback' in kwargs:
-            return expand_env_var(
-                self.airflow_defaults.get(section, key, **kwargs))
-
-        else:
-            log.warning(
-                "section/key [%s/%s] not found in config", section, key
-            )
-
-            raise AirflowConfigException(
-                "section/key [{section}/{key}] not found "
-                "in config".format(section=section, key=key))
+        return None
 
     def getboolean(self, section, key, **kwargs):
         val = str(self.get(section, key, **kwargs)).lower().strip()
@@ -405,7 +435,7 @@ class AirflowConfigParser(ConfigParser):
                 f'Current value: "{val}".'
             )
 
-    def getimport(self, section, key, **kwargs):
+    def getimport(self, section, key, **kwargs):  # noqa
         """
         Reads options, imports the full qualified name, and returns the object.
 
@@ -426,12 +456,12 @@ class AirflowConfigParser(ConfigParser):
                 f'Current value: "{full_qualified_path}".'
             )
 
-    def read(self, filenames, **kwargs):
-        super().read(filenames, **kwargs)
+    def read(self, filenames, encoding=None):
+        super().read(filenames=filenames, encoding=encoding)
         self._validate()
 
-    def read_dict(self, *args, **kwargs):
-        super().read_dict(*args, **kwargs)
+    def read_dict(self, dictionary, source='<dict>'):
+        super().read_dict(dictionary=dictionary, source=source)
         self._validate()
 
     def has_option(self, section, option):
@@ -456,6 +486,7 @@ class AirflowConfigParser(ConfigParser):
         if self.airflow_defaults.has_option(section, option) and remove_default:
             self.airflow_defaults.remove_option(section, option)
 
+    # noinspection PyProtectedMember
     def getsection(self, section: str) -> Optional[Dict[str, Union[str, int, float, bool]]]:
         """
         Returns the section as a dict. Values are converted to int, float, bool
@@ -464,10 +495,12 @@ class AirflowConfigParser(ConfigParser):
         :param section: section from the config
         :rtype: dict
         """
-        if (section not in self._sections and section not in self.airflow_defaults._sections):  # type: ignore
+        # pylint: disable=protected-access
+        if section not in self._sections and section not in self.airflow_defaults._sections:  # type: ignore
             return None
+        # pylint: enable=protected-access
 
-        _section = copy.deepcopy(self.airflow_defaults._sections[section])
+        _section = copy.deepcopy(self.airflow_defaults._sections[section])  # pylint: disable=protected-access
 
         if section in self._sections:  # type: ignore
             _section.update(copy.deepcopy(self._sections[section]))  # type: ignore
@@ -499,13 +532,13 @@ class AirflowConfigParser(ConfigParser):
         # This is based on the configparser.RawConfigParser.write method code to add support for
         # reading options from environment variables.
         if space_around_delimiters:
-            d = " {} ".format(self._delimiters[0])
+            delimiter = " {} ".format(self._delimiters[0])
         else:
-            d = self._delimiters[0]
+            delimiter = self._delimiters[0]
         if self._defaults:
-            self._write_section(fp, self.default_section, self._defaults.items(), d)
+            self._write_section(fp, self.default_section, self._defaults.items(), delimiter)
         for section in self._sections:
-            self._write_section(fp, section, self.getsection(section).items(), d)
+            self._write_section(fp, section, self.getsection(section).items(), delimiter)
 
     def as_dict(
             self, display_source=False, display_sensitive=False, raw=False,
@@ -540,73 +573,94 @@ class AirflowConfigParser(ConfigParser):
         :return: Dictionary, where the key is the name of the section and the content is
             the dictionary with the name of the parameter and its value.
         """
-        cfg: Dict[str, Dict[str, str]] = {}
+        config_sources: Dict[str, Dict[str, str]] = {}
         configs = [
             ('default', self.airflow_defaults),
             ('airflow.cfg', self),
         ]
 
-        for (source_name, config) in configs:
-            for section in config.sections():
-                sect = cfg.setdefault(section, OrderedDict())
-                for (k, val) in config.items(section=section, raw=raw):
-                    if display_source:
-                        val = (val, source_name)
-                    sect[k] = val
+        self._replace_config_with_display_sources(config_sources, configs, display_source, raw)
 
         # add env vars and overwrite because they have priority
         if include_env:
-            for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]:
-                try:
-                    _, section, key = ev.split('__', 2)
-                    opt = self._get_env_var_option(section, key)
-                except ValueError:
-                    continue
-                if not display_sensitive and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE':
-                    opt = '< hidden >'
-                elif raw:
-                    opt = opt.replace('%', '%%')
-                if display_source:
-                    opt = (opt, 'env var')
-
-                section = section.lower()
-                # if we lower key for kubernetes_environment_variables section,
-                # then we won't be able to set any Airflow environment
-                # variables. Airflow only parse environment variables starts
-                # with AIRFLOW_. Therefore, we need to make it a special case.
-                if section != 'kubernetes_environment_variables':
-                    key = key.lower()
-                cfg.setdefault(section, OrderedDict()).update({key: opt})
+            self._include_envs(config_sources, display_sensitive, display_source, raw)
 
         # add bash commands
         if include_cmds:
-            for (section, key) in self.sensitive_config_values:
-                opt = self._get_cmd_option(section, key)
-                if opt:
-                    if not display_sensitive:
-                        opt = '< hidden >'
-                    if display_source:
-                        opt = (opt, 'cmd')
-                    elif raw:
-                        opt = opt.replace('%', '%%')
-                    cfg.setdefault(section, OrderedDict()).update({key: opt})
-                    del cfg[section][key + '_cmd']
+            self._include_commands(config_sources, display_sensitive, display_source, raw)
 
         # add config from secret backends
         if include_secret:
-            for (section, key) in self.sensitive_config_values:
-                opt = self._get_secret_option(section, key)
-                if opt:
-                    if not display_sensitive:
-                        opt = '< hidden >'
-                    if display_source:
-                        opt = (opt, 'secret')
-                    elif raw:
-                        opt = opt.replace('%', '%%')
-                    cfg.setdefault(section, OrderedDict()).update({key: opt})
-                    del cfg[section][key + '_secret']
-
-        return cfg
+            self._include_secrets(config_sources, display_sensitive, display_source, raw)
+        return config_sources
+
+    def _include_secrets(self, config_sources, display_sensitive, display_source, raw):
+        for (section, key) in self.sensitive_config_values:
+            opt = self._get_secret_option(section, key)
+            if opt:
+                if not display_sensitive:
+                    opt = '< hidden >'
+                if display_source:
+                    opt = (opt, 'secret')
+                elif raw:
+                    opt = opt.replace('%', '%%')
+                config_sources.setdefault(section, OrderedDict()).update({key: opt})
+                del config_sources[section][key + '_secret']
+
+    def _include_commands(self, config_sources, display_sensitive, display_source, raw):
+        for (section, key) in self.sensitive_config_values:
+            opt = self._get_cmd_option(section, key)
+            if not opt:
+                continue
+            if not display_sensitive:
+                opt = '< hidden >'
+            if display_source:
+                opt = (opt, 'cmd')
+            elif raw:
+                opt = opt.replace('%', '%%')
+            config_sources.setdefault(section, OrderedDict()).update({key: opt})
+            del config_sources[section][key + '_cmd']
+
+    def _include_envs(self, config_sources, display_sensitive, display_source, raw):
+        for env_var in [os_environment
+                        for os_environment in os.environ if os_environment.startswith('AIRFLOW__')]:
+            try:
+                _, section, key = env_var.split('__', 2)
+                opt = self._get_env_var_option(section, key)
+            except ValueError:
+                continue
+            if not display_sensitive and env_var != 'AIRFLOW__CORE__UNIT_TEST_MODE':
+                opt = '< hidden >'
+            elif raw:
+                opt = opt.replace('%', '%%')
+            if display_source:
+                opt = (opt, 'env var')
+
+            section = section.lower()
+            # if we lower key for kubernetes_environment_variables section,
+            # then we won't be able to set any Airflow environment
+            # variables. Airflow only parse environment variables starts
+            # with AIRFLOW_. Therefore, we need to make it a special case.
+            if section != 'kubernetes_environment_variables':
+                key = key.lower()
+            config_sources.setdefault(section, OrderedDict()).update({key: opt})
+
+    @staticmethod
+    def _replace_config_with_display_sources(config_sources, configs, display_source, raw):
+        for (source_name, config) in configs:
+            for section in config.sections():
+                AirflowConfigParser.\
+                    _replace_section_config_with_display_sources(
+                        config, config_sources, display_source, raw, section, source_name)
+
+    @staticmethod
+    def _replace_section_config_with_display_sources(config, config_sources, display_source, raw, section,
+                                                     source_name):
+        sect = config_sources.setdefault(section, OrderedDict())
+        for (k, val) in config.items(section=section, raw=raw):
+            if display_source:
+                val = (val, source_name)
+            sect[k] = val
 
     def load_test_config(self):
         """
@@ -701,7 +755,7 @@ def parameterized_config(template):
     :param template: a config content templated with {{variables}}
     """
     all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
-    return template.format(**all_vars)
+    return template.format(**all_vars)  # noqa
 
 
 def get_airflow_test_config(airflow_home):
@@ -780,6 +834,7 @@ if conf.getboolean('core', 'unit_test_mode'):
 
 # Historical convenience functions to access config entries
 def load_test_config():   # noqa: D103
+    """Historical load_test_config"""
     warnings.warn(
         "Accessing configuration method 'load_test_config' directly from the configuration module is "
         "deprecated. Please access the configuration from the 'configuration.conf' object via "
@@ -791,6 +846,7 @@ def load_test_config():   # noqa: D103
 
 
 def get(*args, **kwargs):   # noqa: D103
+    """Historical get"""
     warnings.warn(
         "Accessing configuration method 'get' directly from the configuration module is "
         "deprecated. Please access the configuration from the 'configuration.conf' object via "
@@ -802,6 +858,7 @@ def get(*args, **kwargs):   # noqa: D103
 
 
 def getboolean(*args, **kwargs):   # noqa: D103
+    """Historical getboolean"""
     warnings.warn(
         "Accessing configuration method 'getboolean' directly from the configuration module is "
         "deprecated. Please access the configuration from the 'configuration.conf' object via "
@@ -813,6 +870,7 @@ def getboolean(*args, **kwargs):   # noqa: D103
 
 
 def getfloat(*args, **kwargs):   # noqa: D103
+    """Historical getfloat"""
     warnings.warn(
         "Accessing configuration method 'getfloat' directly from the configuration module is "
         "deprecated. Please access the configuration from the 'configuration.conf' object via "
@@ -824,6 +882,7 @@ def getfloat(*args, **kwargs):   # noqa: D103
 
 
 def getint(*args, **kwargs):   # noqa: D103
+    """Historical getint"""
     warnings.warn(
         "Accessing configuration method 'getint' directly from the configuration module is "
         "deprecated. Please access the configuration from the 'configuration.conf' object via "
@@ -835,6 +894,7 @@ def getint(*args, **kwargs):   # noqa: D103
 
 
 def getsection(*args, **kwargs):   # noqa: D103
+    """Historical getsection"""
     warnings.warn(
         "Accessing configuration method 'getsection' directly from the configuration module is "
         "deprecated. Please access the configuration from the 'configuration.conf' object via "
@@ -846,6 +906,7 @@ def getsection(*args, **kwargs):   # noqa: D103
 
 
 def has_option(*args, **kwargs):   # noqa: D103
+    """Historical has_option"""
     warnings.warn(
         "Accessing configuration method 'has_option' directly from the configuration module is "
         "deprecated. Please access the configuration from the 'configuration.conf' object via "
@@ -857,6 +918,7 @@ def has_option(*args, **kwargs):   # noqa: D103
 
 
 def remove_option(*args, **kwargs):   # noqa: D103
+    """Historical remove_option"""
     warnings.warn(
         "Accessing configuration method 'remove_option' directly from the configuration module is "
         "deprecated. Please access the configuration from the 'configuration.conf' object via "
@@ -868,6 +930,7 @@ def remove_option(*args, **kwargs):   # noqa: D103
 
 
 def as_dict(*args, **kwargs):   # noqa: D103
+    """Historical as_dict"""
     warnings.warn(
         "Accessing configuration method 'as_dict' directly from the configuration module is "
         "deprecated. Please access the configuration from the 'configuration.conf' object via "
@@ -878,7 +941,8 @@ def as_dict(*args, **kwargs):   # noqa: D103
     return conf.as_dict(*args, **kwargs)
 
 
-def set(*args, **kwargs):   # noqa: D103
+def set(*args, **kwargs):  # noqa pylint: disable=redefined-builtin
+    """Historical set"""
     warnings.warn(
         "Accessing configuration method 'set' directly from the configuration module is "
         "deprecated. Please access the configuration from the 'configuration.conf' object via "
diff --git a/scripts/ci/pylint_todo.txt b/scripts/ci/pylint_todo.txt
index 17c6103..ac9fdc2 100644
--- a/scripts/ci/pylint_todo.txt
+++ b/scripts/ci/pylint_todo.txt
@@ -1,4 +1,3 @@
-./airflow/configuration.py
 ./airflow/models/dag.py
 ./airflow/models/dagrun.py
 ./airflow/www/utils.py