You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2016/08/12 17:35:12 UTC

incubator-airflow git commit: [AIRFLOW-328][AIRFLOW-371] Remove redundant default configuration & fix unit test configuration

Repository: incubator-airflow
Updated Branches:
  refs/heads/master c39e4f6aa -> 7662cd8ce


[AIRFLOW-328][AIRFLOW-371] Remove redundant default configuration & fix unit test configuration

AIRFLOW-328
https://issues.apache.org/jira/browse/AIRFLOW-328
Previously, Airflow had both a default template for airflow.cfg AND a
dictionary of default values. Frequently, these get out of sync (an
option in one has a different value than in the other, or isn\u2019t present
in the other). This commit removes the default dict and uses the
airflow.cfg template to provide defaults. The ConfigParser first reads
the template, loading all the options it contains, and then reads the
user\u2019s actual airflow.cfg to overwrite the default values with any new
ones.

AIRFLOW-371
https://issues.apache.org/jira/browse/AIRFLOW-371
Calling test_mode() didn't actually change Airflow's configuration! This actually wasn't an issue in unit tests because the unit test run script was hardcoded to point at the unittest.cfg file, but it needed to be fixed.

[AIRFLOW-328] Remove redundant default configuration

Previously, Airflow had both a default template
for airflow.cfg AND a dictionary of default
values. Frequently, these get out of sync (an
option in one has a different value than in the
other, or isn\u2019t present in the other). This commit
removes the default dict and uses the airflow.cfg
template to provide defaults. The ConfigParser
first reads the template, loading all the options
it contains, and then reads the user\u2019s actual
airflow.cfg to overwrite the default values with
any new ones.

[AIRFLOW-371] Make test_mode() functional

Previously, calling test_mode() didn\u2019t actually
do anything.

This PR renames it to load_test_config() (to
avoid confusion, ht @r39132).

In addition, manually entering test_mode after
Airflow launches might be too late \u2014 some
options have already been loaded (DAGS_FOLDER,
etc.). This makes it so setting
tests/unit_test_mode OR the equivalent env var
(AIRFLOW__TESTS__UNIT_TEST_MODE) will load the
test config immediately, prior to loading the
rest of Airflow.

Closes #1677 from jlowin/Simplify-config


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7662cd8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7662cd8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7662cd8c

Branch: refs/heads/master
Commit: 7662cd8ce487134320023ce7f8dce5067f14c0c9
Parents: c39e4f6
Author: jlowin <jl...@users.noreply.github.com>
Authored: Fri Aug 12 10:34:50 2016 -0700
Committer: Siddharth Anand <si...@yahoo.com>
Committed: Fri Aug 12 10:34:50 2016 -0700

----------------------------------------------------------------------
 airflow/configuration.py                        | 289 +++++++++----------
 docs/configuration.rst                          |  19 ++
 run_unit_tests.sh                               |   2 +-
 tests/configuration.py                          |  14 +-
 tests/contrib/hooks/aws_hook.py                 |   2 +-
 tests/contrib/hooks/emr_hook.py                 |   2 +-
 .../contrib/operators/emr_add_steps_operator.py |   2 +-
 .../operators/emr_create_job_flow_operator.py   |   2 +-
 .../emr_terminate_job_flow_operator.py          |   2 +-
 tests/contrib/operators/fs_operator.py          |   4 +-
 tests/contrib/operators/hipchat_operator.py     |   2 +-
 tests/contrib/operators/ssh_execute_operator.py |   4 +-
 tests/contrib/sensors/emr_base_sensor.py        |   2 +-
 tests/contrib/sensors/emr_job_flow_sensor.py    |   2 +-
 tests/contrib/sensors/emr_step_sensor.py        |   2 +-
 tests/core.py                                   |  24 +-
 tests/jobs.py                                   |   2 +-
 tests/operators/hive_operator.py                |   6 +-
 tests/operators/operators.py                    |   8 +-
 tests/operators/sensors.py                      |   4 +-
 tests/utils.py                                  |  17 +-
 21 files changed, 200 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index e1b3481..516afdb 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -21,6 +21,7 @@ import copy
 import errno
 import logging
 import os
+import six
 import subprocess
 import warnings
 
@@ -88,102 +89,6 @@ def run_command(command):
     return output
 
 
-defaults = {
-    'core': {
-        'unit_test_mode': False,
-        'parallelism': 32,
-        'load_examples': True,
-        'plugins_folder': None,
-        'security': None,
-        'donot_pickle': False,
-        'remote_base_log_folder': '',
-        'remote_log_conn_id': '',
-        'encrypt_s3_logs': False,
-        's3_log_folder': '',  # deprecated!
-        'dag_concurrency': 16,
-        'max_active_runs_per_dag': 16,
-        'executor': 'SequentialExecutor',
-        'dags_are_paused_at_creation': True,
-        'sql_alchemy_pool_size': 5,
-        'sql_alchemy_pool_recycle': 3600,
-        'dagbag_import_timeout': 30,
-        'non_pooled_task_slot_count': 128,
-    },
-    'operators': {
-        'default_owner': 'airflow',
-        'default_cpus': 1,
-        'default_ram': 512,
-        'default_disk': 512,
-        'default_gpus': 0,
-    },
-    'webserver': {
-        'base_url': 'http://localhost:8080',
-        'web_server_host': '0.0.0.0',
-        'web_server_port': '8080',
-        'web_server_worker_timeout': 120,
-        'worker_refresh_batch_size': 1,
-        'worker_refresh_interval': 30,
-        'authenticate': False,
-        'filter_by_owner': False,
-        'owner_mode': 'user',
-        'demo_mode': False,
-        'secret_key': 'airflowified',
-        'expose_config': False,
-        'workers': 4,
-        'worker_class': 'sync',
-        'access_logfile': '',
-        'error_logfile': '',
-        'dag_orientation': 'LR',
-    },
-    'scheduler': {
-        'statsd_on': False,
-        'statsd_host': 'localhost',
-        'statsd_port': 8125,
-        'statsd_prefix': 'airflow',
-        'job_heartbeat_sec': 5,
-        'scheduler_heartbeat_sec': 60,
-        'authenticate': False,
-        'max_threads': 2,
-        'run_duration': 30 * 60,
-        'dag_dir_list_interval': 5 * 60,
-        'print_stats_interval': 30,
-        'min_file_process_interval': 180,
-        'child_process_log_directory': '/tmp/airflow/scheduler/logs'
-    },
-    'celery': {
-        'broker_url': 'sqla+mysql://airflow:airflow@localhost:3306/airflow',
-        'celery_app_name': 'airflow.executors.celery_executor',
-        'celery_result_backend': 'db+mysql://airflow:airflow@localhost:3306/airflow',
-        'celeryd_concurrency': 16,
-        'default_queue': 'default',
-        'flower_host': '0.0.0.0',
-        'flower_port': '5555',
-        'worker_log_server_port': '8793',
-    },
-    'email': {
-        'email_backend': 'airflow.utils.email.send_email_smtp',
-    },
-    'smtp': {
-        'smtp_starttls': True,
-        'smtp_ssl': False,
-        'smtp_user': '',
-        'smtp_password': '',
-    },
-    'kerberos': {
-        'ccache': '/tmp/airflow_krb5_ccache',
-        'principal': 'airflow',                 # gets augmented with fqdn
-        'reinit_frequency': '3600',
-        'kinit_path': 'kinit',
-        'keytab': 'airflow.keytab',
-    },
-    'github_enterprise': {
-        'api_rev': 'v3'
-    },
-    'admin': {
-        'hide_sensitive_variable_fields': True,
-    },
-}
-
 DEFAULT_CONFIG = """\
 [core]
 # The home folder for airflow, default is ~/airflow
@@ -206,8 +111,8 @@ remote_base_log_folder =
 remote_log_conn_id =
 # Use server-side encryption for logs stored in S3
 encrypt_s3_logs = False
-# deprecated option for remote log storage, use remote_base_log_folder instead!
-# s3_log_folder =
+# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
+s3_log_folder =
 
 # The executor class that airflow should use. Choices include
 # SequentialExecutor, LocalExecutor, CeleryExecutor
@@ -262,6 +167,13 @@ donot_pickle = False
 # How long before timing out a python file import while filling the DagBag
 dagbag_import_timeout = 30
 
+# What security module to use (for example kerberos):
+security =
+
+# Turn unit test mode on (overwrites many configuration options with test
+# values at runtime)
+unit_test_mode = False
+
 
 [operators]
 # The default owner assigned to each new operator, unless
@@ -311,7 +223,7 @@ access_logfile = -
 error_logfile = -
 
 # Expose the configuration file in the web server
-expose_config = true
+expose_config = False
 
 # Set to true to turn on authentication:
 # http://pythonhosted.org/airflow/installation.html#web-authentication
@@ -327,17 +239,22 @@ filter_by_owner = False
 # in order to user the ldapgroup mode.
 owner_mode = user
 
-# Default DAG orientation. Valid values are: LR (Left->Right), TB (Top->Bottom),
-# RL (Right->Left), BT (Bottom->Top).
+# Default DAG orientation. Valid values are:
+# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
 dag_orientation = LR
 
+# Puts the webserver in demonstration mode; blurs the names of Operators for
+# privacy.
+demo_mode = False
+
 [email]
 email_backend = airflow.utils.email.send_email_smtp
 
+
 [smtp]
 # If you want airflow to send emails on retries, failure, and you want to use
-# the airflow.utils.email.send_email_smtp function, you have to configure an smtp
-# server here
+# the airflow.utils.email.send_email_smtp function, you have to configure an
+# smtp server here
 smtp_host = localhost
 smtp_starttls = True
 smtp_ssl = False
@@ -346,6 +263,7 @@ smtp_port = 25
 smtp_password = airflow
 smtp_mail_from = airflow@airflow.com
 
+
 [celery]
 # This section only applies if you are using the CeleryExecutor in
 # [core] section above
@@ -384,6 +302,7 @@ flower_port = 5555
 # Default queue that tasks get assigned to and that worker listen on.
 default_queue = default
 
+
 [scheduler]
 # Task instances listen for external kill signal (when you clear tasks
 # from the CLI or the UI), this defines the frequency at which they should
@@ -395,17 +314,27 @@ job_heartbeat_sec = 5
 # how often the scheduler should run (in seconds).
 scheduler_heartbeat_sec = 5
 
+run_duration = 1800
+dag_dir_list_interval = 300
+print_stats_interval = 30
+min_file_process_interval = 180
+
+child_process_log_directory = /tmp/airflow/scheduler/logs
+
 # Statsd (https://github.com/etsy/statsd) integration settings
-# statsd_on =  False
-# statsd_host =  localhost
-# statsd_port =  8125
-# statsd_prefix = airflow
+statsd_on = False
+statsd_host = localhost
+statsd_port = 8125
+statsd_prefix = airflow
 
 # The scheduler can run multiple threads in parallel to schedule dags.
 # This defines how many threads will run. However airflow will never
 # use more threads than the amount of cpu cores available.
 max_threads = 2
 
+authenticate = False
+
+
 [mesos]
 # Mesos master address which MesosExecutor will connect to.
 master = localhost:5050
@@ -443,6 +372,20 @@ authenticate = False
 # default_principal = admin
 # default_secret = admin
 
+
+[kerberos]
+ccache = /tmp/airflow_krb5_ccache
+# gets augmented with fqdn
+principal = airflow
+reinit_frequency = 3600
+kinit_path = kinit
+keytab = airflow.keytab
+
+
+[github_enterprise]
+api_rev = v3
+
+
 [admin]
 # UI to hide sensitive variable fields when set to True
 hide_sensitive_variable_fields = True
@@ -451,12 +394,12 @@ hide_sensitive_variable_fields = True
 
 TEST_CONFIG = """\
 [core]
+unit_test_mode = True
 airflow_home = {AIRFLOW_HOME}
 dags_folder = {TEST_DAGS_FOLDER}
 base_log_folder = {AIRFLOW_HOME}/logs
 executor = SequentialExecutor
 sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
-unit_test_mode = True
 load_examples = True
 donot_pickle = False
 dag_concurrency = 16
@@ -501,11 +444,11 @@ max_threads = 2
 """
 
 
-class ConfigParserWithDefaults(ConfigParser):
+class AirflowConfigParser(ConfigParser):
 
     # These configuration elements can be fetched as the stdout of commands
-    # following the "{section}__{name}__cmd" pattern, the idea behind this is to not
-    # store password on boxes in text files.
+    # following the "{section}__{name}__cmd" pattern, the idea behind this
+    # is to not store password on boxes in text files.
     as_command_stdout = {
         ('core', 'sql_alchemy_conn'),
         ('core', 'fernet_key'),
@@ -513,33 +456,51 @@ class ConfigParserWithDefaults(ConfigParser):
         ('celery', 'celery_result_backend')
     }
 
-    def __init__(self, defaults, *args, **kwargs):
-        self.defaults = defaults
+    def __init__(self, *args, **kwargs):
         ConfigParser.__init__(self, *args, **kwargs)
+        self.read_string(parameterized_config(DEFAULT_CONFIG))
         self.is_validated = False
 
+    def read_string(self, string, source='<string>'):
+        """
+        Read configuration from a string.
+
+        A backwards-compatible version of the ConfigParser.read_string()
+        method that was introduced in Python 3.
+        """
+        # Python 3 added read_string() method
+        if six.PY3:
+            ConfigParser.read_string(self, string, source=source)
+        # Python 2 requires StringIO buffer
+        else:
+            import StringIO
+            self.readfp(StringIO.StringIO(string))
+
     def _validate(self):
         if (
                 self.get("core", "executor") != 'SequentialExecutor' and
                 "sqlite" in self.get('core', 'sql_alchemy_conn')):
-            raise AirflowConfigException("error: cannot use sqlite with the {}".
-                format(self.get('core', 'executor')))
+            raise AirflowConfigException(
+                "error: cannot use sqlite with the {}".format(
+                    self.get('core', 'executor')))
 
         elif (
             self.getboolean("webserver", "authenticate") and
             self.get("webserver", "owner_mode") not in ['user', 'ldapgroup']
         ):
-            raise AirflowConfigException("error: owner_mode option should be either "
-                                         "'user' or 'ldapgroup' "
-                                         "when filtering by owner is set")
+            raise AirflowConfigException(
+                "error: owner_mode option should be either "
+                "'user' or 'ldapgroup' when filtering by owner is set")
 
         elif (
             self.getboolean("webserver", "authenticate") and
             self.get("webserver", "owner_mode").lower() == 'ldapgroup' and
-            self.get("core", "auth_backend") != 'airflow.contrib.auth.backends.ldap_auth'
+            self.get("core", "auth_backend") != (
+                'airflow.contrib.auth.backends.ldap_auth')
         ):
-            raise AirflowConfigException("error: attempt at using ldapgroup "
-                                         "filtering without using the Ldap backend")
+            raise AirflowConfigException(
+                "error: attempt at using ldapgroup "
+                "filtering without using the Ldap backend")
 
         self.is_validated = True
 
@@ -551,18 +512,20 @@ class ConfigParserWithDefaults(ConfigParser):
 
     def _get_cmd_option(self, section, key):
         fallback_key = key + '_cmd'
-        if (
-                (section, key) in ConfigParserWithDefaults.as_command_stdout and
-                self.has_option(section, fallback_key)):
-            command = self.get(section, fallback_key)
-            return run_command(command)
+        # if this is a valid command key...
+        if (section, key) in AirflowConfigParser.as_command_stdout:
+            # if the original key is present, return it no matter what
+            if self.has_option(section, key):
+                return ConfigParser.get(self, section, key)
+            # otherwise, execute the fallback key
+            elif self.has_option(section, fallback_key):
+                command = self.get(section, fallback_key)
+                return run_command(command)
 
     def get(self, section, key, **kwargs):
         section = str(section).lower()
         key = str(key).lower()
 
-        d = self.defaults
-
         # first check environment variables
         option = self._get_env_var_option(section, key)
         if option:
@@ -578,13 +541,9 @@ class ConfigParserWithDefaults(ConfigParser):
         if option:
             return option
 
-        # ...then the defaults
-        if section in d and key in d[section]:
-            return expand_env_var(d[section][key])
-
         else:
-            logging.warn("section/key [{section}/{key}] not found "
-                         "in config".format(**locals()))
+            logging.warning("section/key [{section}/{key}] not found "
+                            "in config".format(**locals()))
 
             raise AirflowConfigException(
                 "section/key [{section}/{key}] not found "
@@ -594,12 +553,14 @@ class ConfigParserWithDefaults(ConfigParser):
         val = str(self.get(section, key)).lower().strip()
         if '#' in val:
             val = val.split('#')[0].strip()
-        if val == "true":
+        if val.lower() in ('t', 'true', '1'):
             return True
-        elif val == "false":
+        elif val.lower() in ('f', 'false', '0'):
             return False
         else:
-            raise AirflowConfigException("Not a boolean.")
+            raise AirflowConfigException(
+                'The value for configuration option "{}:{}" is not a '
+                'boolean (received "{}").'.format(section, key, val))
 
     def getint(self, section, key):
         return int(self.get(section, key))
@@ -633,7 +594,7 @@ class ConfigParserWithDefaults(ConfigParser):
         if display_source:
             for section in cfg:
                 for k, v in cfg[section].items():
-                    cfg[section][k] = (v, 'airflow.cfg')
+                    cfg[section][k] = (v, 'airflow config')
 
         # add env vars and overwrite because they have priority
         for ev in [ev for ev in os.environ if ev.startswith('AIRFLOW__')]:
@@ -643,7 +604,9 @@ class ConfigParserWithDefaults(ConfigParser):
             except ValueError:
                 opt = None
             if opt:
-                if not display_sensitive:
+                if (
+                        not display_sensitive
+                        and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
                     opt = '< hidden >'
                 if display_source:
                     opt = (opt, 'env var')
@@ -651,7 +614,7 @@ class ConfigParserWithDefaults(ConfigParser):
                     {key.lower(): opt})
 
         # add bash commands
-        for (section, key) in ConfigParserWithDefaults.as_command_stdout:
+        for (section, key) in AirflowConfigParser.as_command_stdout:
             opt = self._get_cmd_option(section, key)
             if opt:
                 if not display_sensitive:
@@ -660,18 +623,21 @@ class ConfigParserWithDefaults(ConfigParser):
                     opt = (opt, 'bash cmd')
                 cfg.setdefault(section, OrderedDict()).update({key: opt})
 
-        # add defaults
-        for section in sorted(self.defaults):
-            for key in sorted(self.defaults[section].keys()):
-                if key not in cfg.setdefault(section, OrderedDict()):
-                    opt = str(self.defaults[section][key])
-                    if display_source:
-                        cfg[section][key] = (opt, 'default')
-                    else:
-                        cfg[section][key] = opt
-
         return cfg
 
+    def load_test_config(self):
+        """
+        Load the unit test configuration.
+
+        Note: this is not reversible.
+        """
+        # override any custom settings with defaults
+        self.read_string(parameterized_config(DEFAULT_CONFIG))
+        # then read test config
+        self.read_string(parameterized_config(TEST_CONFIG))
+        # then read any "custom" test settings
+        self.read(TEST_CONFIG_FILE)
+
 
 def mkdir_p(path):
     try:
@@ -732,8 +698,8 @@ if not os.path.isfile(TEST_CONFIG_FILE):
 
 if not os.path.isfile(AIRFLOW_CONFIG):
     # These configuration options are used to generate a default configuration
-    # when it is missing. The right way to change your configuration is to alter
-    # your configuration file, not this code.
+    # when it is missing. The right way to change your configuration is to
+    # alter your configuration file, not this code.
     logging.info("Creating new airflow config file in: " + AIRFLOW_CONFIG)
     with open(AIRFLOW_CONFIG, 'w') as f:
         f.write(parameterized_config(DEFAULT_CONFIG))
@@ -741,14 +707,22 @@ if not os.path.isfile(AIRFLOW_CONFIG):
 logging.info("Reading the config from " + AIRFLOW_CONFIG)
 
 
-def test_mode():
-    conf = ConfigParserWithDefaults(defaults)
-    conf.read(TEST_CONFIG)
-
-conf = ConfigParserWithDefaults(defaults)
+conf = AirflowConfigParser()
 conf.read(AIRFLOW_CONFIG)
 
 
+def load_test_config():
+    """
+    Load the unit test configuration.
+
+    Note: this is not reversible.
+    """
+    conf.load_test_config()
+
+if conf.getboolean('core', 'unit_test_mode'):
+    load_test_config()
+
+
 def get(section, key, **kwargs):
     return conf.get(section, key, **kwargs)
 
@@ -785,6 +759,5 @@ def set(section, option, value):  # noqa
 ########################
 # convenience method to access config entries
 
-
 def get_dags_folder():
     return os.path.expanduser(get('core', 'DAGS_FOLDER'))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/docs/configuration.rst
----------------------------------------------------------------------
diff --git a/docs/configuration.rst b/docs/configuration.rst
index 3eed553..c4a3442 100644
--- a/docs/configuration.rst
+++ b/docs/configuration.rst
@@ -4,6 +4,8 @@ Configuration
 Setting up the sandbox in the :doc:`start` section was easy;
 building a production-grade environment requires a bit more work!
 
+.. _setting-options:
+
 Setting Configuration Options
 '''''''''''''''''''''''''''''
 
@@ -228,3 +230,20 @@ integrated with upstart
 .. code-block:: bash
 
     initctl airflow-webserver status
+
+Test Mode
+'''''''''
+Airflow has a fixed set of "test mode" configuration options. You can load these
+at any time by calling ``airflow.configuration.load_test_config()`` (note this
+operation is not reversible!). However, some options (like the DAG_FOLDER) are
+loaded before you have a chance to call load_test_config(). In order to eagerly load
+the test configuration, set test_mode in airflow.cfg:
+
+.. code-block:: bash
+
+  [tests]
+  unit_test_mode = True
+
+Due to Airflow's automatic environment variable expansion (see :ref:`setting-options`),
+you can also set the env var ``AIRFLOW__CORE__UNIT_TEST_MODE`` to temporarily overwrite
+airflow.cfg.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/run_unit_tests.sh
----------------------------------------------------------------------
diff --git a/run_unit_tests.sh b/run_unit_tests.sh
index f035386..387106e 100755
--- a/run_unit_tests.sh
+++ b/run_unit_tests.sh
@@ -16,7 +16,7 @@
 
 # environment
 export AIRFLOW_HOME=${AIRFLOW_HOME:=~/airflow}
-export AIRFLOW_CONFIG=$AIRFLOW_HOME/unittests.cfg
+export AIRFLOW__CORE__UNIT_TEST_MODE=True
 
 # configuration test
 export AIRFLOW__TESTSECTION__TESTKEY=testvalue

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/configuration.py
----------------------------------------------------------------------
diff --git a/tests/configuration.py b/tests/configuration.py
index 05cd93d..bb0fd17 100644
--- a/tests/configuration.py
+++ b/tests/configuration.py
@@ -19,11 +19,10 @@ import unittest
 from airflow import configuration
 from airflow.configuration import conf
 
-configuration.test_mode()
-
 class ConfTest(unittest.TestCase):
+
     def setup(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
     def test_env_var_config(self):
         opt = conf.get('testsection', 'testkey')
@@ -38,15 +37,10 @@ class ConfTest(unittest.TestCase):
         # test env vars
         self.assertEqual(cfg_dict['testsection']['testkey'], '< hidden >')
 
-        # test defaults
-        conf.remove_option('core', 'load_examples')
-        cfg_dict = conf.as_dict()
-        self.assertEqual(cfg_dict['core']['load_examples'], 'True')
-
         # test display_source
         cfg_dict = conf.as_dict(display_source=True)
-        self.assertEqual(cfg_dict['core']['unit_test_mode'][1], 'airflow.cfg')
-        self.assertEqual(cfg_dict['core']['load_examples'][1], 'default')
+        self.assertEqual(
+            cfg_dict['core']['load_examples'][1], 'airflow config')
         self.assertEqual(
             cfg_dict['testsection']['testkey'], ('< hidden >', 'env var'))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/hooks/aws_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/aws_hook.py b/tests/contrib/hooks/aws_hook.py
index 2a40127..6f13e58 100644
--- a/tests/contrib/hooks/aws_hook.py
+++ b/tests/contrib/hooks/aws_hook.py
@@ -29,7 +29,7 @@ except ImportError:
 class TestAwsHook(unittest.TestCase):
     @mock_emr
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
     @unittest.skipIf(mock_emr is None, 'mock_emr package not present')
     @mock_emr

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/hooks/emr_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/emr_hook.py b/tests/contrib/hooks/emr_hook.py
index fbe91e8..119df99 100644
--- a/tests/contrib/hooks/emr_hook.py
+++ b/tests/contrib/hooks/emr_hook.py
@@ -29,7 +29,7 @@ except ImportError:
 class TestEmrHook(unittest.TestCase):
     @mock_emr
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
     @unittest.skipIf(mock_emr is None, 'mock_emr package not present')
     @mock_emr

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/emr_add_steps_operator.py b/tests/contrib/operators/emr_add_steps_operator.py
index 4335e5e..37f9a4c 100644
--- a/tests/contrib/operators/emr_add_steps_operator.py
+++ b/tests/contrib/operators/emr_add_steps_operator.py
@@ -28,7 +28,7 @@ ADD_STEPS_SUCCESS_RETURN = {
 
 class TestEmrAddStepsOperator(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
         # Mock out the emr_client (moto has incorrect response)
         mock_emr_client = MagicMock()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/emr_create_job_flow_operator.py b/tests/contrib/operators/emr_create_job_flow_operator.py
index ac32125..4aa4cd2 100644
--- a/tests/contrib/operators/emr_create_job_flow_operator.py
+++ b/tests/contrib/operators/emr_create_job_flow_operator.py
@@ -28,7 +28,7 @@ RUN_JOB_FLOW_SUCCESS_RETURN = {
 
 class TestEmrCreateJobFlowOperator(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
         # Mock out the emr_client (moto has incorrect response)
         mock_emr_client = MagicMock()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/emr_terminate_job_flow_operator.py b/tests/contrib/operators/emr_terminate_job_flow_operator.py
index 38f20e2..94c0124 100644
--- a/tests/contrib/operators/emr_terminate_job_flow_operator.py
+++ b/tests/contrib/operators/emr_terminate_job_flow_operator.py
@@ -27,7 +27,7 @@ TERMINATE_SUCCESS_RETURN = {
 
 class TestEmrTerminateJobFlowOperator(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
         # Mock out the emr_client (moto has incorrect response)
         mock_emr_client = MagicMock()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/fs_operator.py b/tests/contrib/operators/fs_operator.py
index d7f92aa..2e79f86 100644
--- a/tests/contrib/operators/fs_operator.py
+++ b/tests/contrib/operators/fs_operator.py
@@ -23,7 +23,7 @@ from airflow.contrib.operators.fs_operator import FileSensor
 
 TEST_DAG_ID = 'unit_tests'
 DEFAULT_DATE = datetime(2015, 1, 1)
-configuration.test_mode()
+configuration.load_test_config()
 
 
 def reset(dag_id=TEST_DAG_ID):
@@ -37,7 +37,7 @@ reset()
 
 class FileSensorTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         from airflow.contrib.hooks.fs_hook import FSHook
         hook = FSHook()
         args = {

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/operators/hipchat_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/hipchat_operator.py b/tests/contrib/operators/hipchat_operator.py
index d0d4583..65a2edd 100644
--- a/tests/contrib/operators/hipchat_operator.py
+++ b/tests/contrib/operators/hipchat_operator.py
@@ -31,7 +31,7 @@ except ImportError:
 
 class HipChatOperatorTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
     @unittest.skipIf(mock is None, 'mock package not present')
     @mock.patch('requests.request')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/operators/ssh_execute_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/ssh_execute_operator.py b/tests/contrib/operators/ssh_execute_operator.py
index e0387d0..355f163 100644
--- a/tests/contrib/operators/ssh_execute_operator.py
+++ b/tests/contrib/operators/ssh_execute_operator.py
@@ -23,7 +23,7 @@ from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
 
 TEST_DAG_ID = 'unit_tests'
 DEFAULT_DATE = datetime(2015, 1, 1)
-configuration.test_mode()
+configuration.load_test_config()
 
 
 def reset(dag_id=TEST_DAG_ID):
@@ -38,7 +38,7 @@ reset()
 
 class SSHExecuteOperatorTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         from airflow.contrib.hooks.ssh_hook import SSHHook
         hook = SSHHook()
         hook.no_host_key_check = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/emr_base_sensor.py b/tests/contrib/sensors/emr_base_sensor.py
index ca86a23..0b8ad2f 100644
--- a/tests/contrib/sensors/emr_base_sensor.py
+++ b/tests/contrib/sensors/emr_base_sensor.py
@@ -21,7 +21,7 @@ from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
 
 class TestEmrBaseSensor(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
     def test_subclasses_that_implment_required_methods_and_constants_succeed_when_response_is_good(self):
         class EmrBaseSensorSubclass(EmrBaseSensor):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/emr_job_flow_sensor.py b/tests/contrib/sensors/emr_job_flow_sensor.py
index 068f554..f993786 100644
--- a/tests/contrib/sensors/emr_job_flow_sensor.py
+++ b/tests/contrib/sensors/emr_job_flow_sensor.py
@@ -87,7 +87,7 @@ DESCRIBE_CLUSTER_TERMINATED_RETURN = {
 
 class TestEmrJobFlowSensor(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
         # Mock out the emr_client (moto has incorrect response)
         self.mock_emr_client = MagicMock()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/emr_step_sensor.py b/tests/contrib/sensors/emr_step_sensor.py
index 4b0a059..58ee461 100644
--- a/tests/contrib/sensors/emr_step_sensor.py
+++ b/tests/contrib/sensors/emr_step_sensor.py
@@ -82,7 +82,7 @@ DESCRIBE_JOB_STEP_COMPLETED_RETURN = {
 
 class TestEmrStepSensor(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
         # Mock out the emr_client (moto has incorrect response)
         self.mock_emr_client = MagicMock()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 63975c5..98f0e06 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -35,7 +35,7 @@ from airflow import configuration
 from airflow.executors import SequentialExecutor, LocalExecutor
 from airflow.models import Variable
 
-configuration.test_mode()
+configuration.load_test_config()
 from airflow import jobs, models, DAG, utils, macros, settings, exceptions
 from airflow.models import BaseOperator
 from airflow.operators.bash_operator import BashOperator
@@ -116,7 +116,7 @@ class CoreTest(unittest.TestCase):
                               "num_runs": 1}
 
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         self.dagbag = models.DagBag(
             dag_folder=DEV_NULL, include_examples=True)
         self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
@@ -836,7 +836,7 @@ class CoreTest(unittest.TestCase):
 
 class CliTests(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         app = application.create_app()
         app.config['TESTING'] = True
         self.parser = cli.CLIFactory.get_parser()
@@ -1003,7 +1003,7 @@ class CliTests(unittest.TestCase):
 
 class WebUiTests(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         configuration.conf.set("webserver", "authenticate", "False")
         app = application.create_app()
         app.config['TESTING'] = True
@@ -1226,7 +1226,7 @@ class WebPasswordAuthTest(unittest.TestCase):
         self.assertEqual(response.status_code, 302)
 
     def tearDown(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         session = Session()
         session.query(models.User).delete()
         session.commit()
@@ -1310,7 +1310,7 @@ class WebLdapAuthTest(unittest.TestCase):
         assert 'Connections' in response.data.decode('utf-8')
 
     def tearDown(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         session = Session()
         session.query(models.User).delete()
         session.commit()
@@ -1346,7 +1346,7 @@ class LdapGroupTest(unittest.TestCase):
             assert set(auth.ldap_groups) == set(users[user])
 
     def tearDown(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         configuration.conf.set("webserver", "authenticate", "False")
 
 
@@ -1365,7 +1365,7 @@ class FakeSession(object):
 
 class HttpOpSensorTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO}
         dag = DAG(TEST_DAG_ID, default_args=args)
         self.dag = dag
@@ -1419,7 +1419,7 @@ class FakeWebHDFSHook(object):
 
 class ConnectionTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         utils.db.initdb()
         os.environ['AIRFLOW_CONN_TEST_URI'] = (
             'postgres://username:password@ec2.compute.com:5432/the_database')
@@ -1475,7 +1475,7 @@ class ConnectionTest(unittest.TestCase):
 
 class WebHDFSHookTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
 
     def test_simple_init(self):
         from airflow.hooks.webhdfs_hook import WebHDFSHook
@@ -1498,7 +1498,7 @@ except ImportError:
                  "Skipping test because S3Hook is not installed")
 class S3HookTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         self.s3_test_url = "s3://test/this/is/not/a-real-key.txt"
 
     def test_parse_s3_url(self):
@@ -1523,7 +1523,7 @@ conn.sendall(b'hello')
 
 class SSHHookTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         from airflow.contrib.hooks.ssh_hook import SSHHook
         self.hook = SSHHook()
         self.hook.no_host_key_check = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 7a8a57e..f7a0a26 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -37,7 +37,7 @@ from airflow.utils.timeout import timeout
 from tests.executor.test_executor import TestExecutor
 
 from airflow import configuration
-configuration.test_mode()
+configuration.load_test_config()
 
 try:
     from unittest import mock

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index c5a0f0e..e52ca54 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -20,7 +20,7 @@ import unittest
 import six
 
 from airflow import DAG, configuration, operators, utils
-configuration.test_mode()
+configuration.load_test_config()
 
 import os
 import unittest
@@ -39,7 +39,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
 
     class HiveServer2Test(unittest.TestCase):
         def setUp(self):
-            configuration.test_mode()
+            configuration.load_test_config()
 
         def test_select_conn(self):
             from airflow.hooks.hive_hooks import HiveServer2Hook
@@ -71,7 +71,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
     class HivePrestoTest(unittest.TestCase):
 
         def setUp(self):
-            configuration.test_mode()
+            configuration.load_test_config()
             args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
             dag = DAG('test_dag_id', default_args=args)
             self.dag = dag

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 8ca47bd..9ecde52 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -21,7 +21,7 @@ import six
 
 from airflow import DAG, configuration, operators, utils
 from airflow.utils.tests import skipUnlessImported
-configuration.test_mode()
+configuration.load_test_config()
 
 import os
 import unittest
@@ -36,7 +36,7 @@ TEST_DAG_ID = 'unit_test_dag'
 @skipUnlessImported('airflow.operators.mysql_operator', 'MySqlOperator')
 class MySqlTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         args = {
             'owner': 'airflow',
             'mysql_conn_id': 'airflow_db',
@@ -99,7 +99,7 @@ class MySqlTest(unittest.TestCase):
 @skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
 class PostgresTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
         dag = DAG(TEST_DAG_ID, default_args=args)
         self.dag = dag
@@ -166,7 +166,7 @@ class TransferTests(unittest.TestCase):
     cluster = None
 
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
         dag = DAG(TEST_DAG_ID, default_args=args)
         self.dag = dag

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index 325ee8d..aa65cfe 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -26,7 +26,7 @@ from airflow.utils.decorators import apply_defaults
 from airflow.exceptions import (AirflowException,
                                 AirflowSensorTimeout,
                                 AirflowSkipException)
-configuration.test_mode()
+configuration.load_test_config()
 
 DEFAULT_DATE = datetime(2015, 1, 1)
 TEST_DAG_ID = 'unit_test_dag'
@@ -69,7 +69,7 @@ class TimeoutTestSensor(BaseSensorOperator):
 
 class SensorTimeoutTest(unittest.TestCase):
     def setUp(self):
-        configuration.test_mode()
+        configuration.load_test_config()
         args = {
             'owner': 'airflow',
             'start_date': DEFAULT_DATE

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7662cd8c/tests/utils.py
----------------------------------------------------------------------
diff --git a/tests/utils.py b/tests/utils.py
index ebed15b..11c24fe 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -57,8 +57,11 @@ class LogUtilsTest(unittest.TestCase):
             glog.parse_gcs_url('gs://bucket/'),
             ('bucket', ''))
 
-
 class OperatorResourcesTest(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+
     def test_all_resources_specified(self):
         resources = Resources(cpus=1, ram=2, disk=3, gpus=4)
         self.assertEqual(resources.cpus.qty, 1)
@@ -70,21 +73,21 @@ class OperatorResourcesTest(unittest.TestCase):
         resources = Resources(cpus=0, disk=1)
         self.assertEqual(resources.cpus.qty, 0)
         self.assertEqual(resources.ram.qty,
-                         configuration.defaults['operators']['default_ram'])
+                         configuration.getint('operators', 'default_ram'))
         self.assertEqual(resources.disk.qty, 1)
         self.assertEqual(resources.gpus.qty,
-                         configuration.defaults['operators']['default_gpus'])
+                         configuration.getint('operators', 'default_gpus'))
 
     def test_no_resources_specified(self):
         resources = Resources()
         self.assertEqual(resources.cpus.qty,
-                         configuration.defaults['operators']['default_cpus'])
+                         configuration.getint('operators', 'default_cpus'))
         self.assertEqual(resources.ram.qty,
-                         configuration.defaults['operators']['default_ram'])
+                         configuration.getint('operators', 'default_ram'))
         self.assertEqual(resources.disk.qty,
-                         configuration.defaults['operators']['default_disk'])
+                         configuration.getint('operators', 'default_disk'))
         self.assertEqual(resources.gpus.qty,
-                         configuration.defaults['operators']['default_gpus'])
+                         configuration.getint('operators', 'default_gpus'))
 
     def test_negative_resource_qty(self):
         with self.assertRaises(AirflowException):