You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xu...@apache.org on 2017/03/19 15:20:44 UTC

[1/3] incubator-airflow git commit: [AIRFLOW-1005] Improve Airflow startup time

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7d11444a5 -> 23a16f7ad


[AIRFLOW-1005] Improve Airflow startup time

Airflow\u2019s startup time can be reduced by 50% by
deferring imports of Cryptography (and relatedly,
not generating Fernet keys unless we have to) and
Alembic.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/996dd309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/996dd309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/996dd309

Branch: refs/heads/master
Commit: 996dd309331b010b15e34b99222430283ad7d8a4
Parents: a8027a3
Author: Jeremiah Lowin <jl...@apache.org>
Authored: Fri Mar 17 18:25:24 2017 -0400
Committer: Jeremiah Lowin <jl...@apache.org>
Committed: Sun Mar 19 10:06:33 2017 -0400

----------------------------------------------------------------------
 airflow/configuration.py | 30 ++++++++++++++++------------
 airflow/models.py        | 46 ++++++++++++++++++++++++++++---------------
 airflow/utils/db.py      | 11 ++++++-----
 3 files changed, 53 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/996dd309/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 895c08d..9c7a03e 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -43,18 +43,16 @@ warnings.filterwarnings(
     action='default', category=PendingDeprecationWarning, module='airflow')
 
 
-try:
-    from cryptography.fernet import Fernet
-except ImportError:
-    pass
-
-
 def generate_fernet_key():
     try:
-        FERNET_KEY = Fernet.generate_key().decode()
+        from cryptography.fernet import Fernet
+    except ImportError:
+        pass
+    try:
+        key = Fernet.generate_key().decode()
     except NameError:
-        FERNET_KEY = "cryptography_not_found_storing_passwords_in_plain_text"
-    return FERNET_KEY
+        key = "cryptography_not_found_storing_passwords_in_plain_text"
+    return key
 
 
 def expand_env_var(env_var):
@@ -774,14 +772,19 @@ def parameterized_config(template):
     current scope
     :param template: a config content templated with {{variables}}
     """
-    FERNET_KEY = generate_fernet_key()
     all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
+    if 'FERNET_KEY' not in all_vars:
+        all_vars['FERNET_KEY'] = ''
     return template.format(**all_vars)
 
 TEST_CONFIG_FILE = AIRFLOW_HOME + '/unittests.cfg'
+# only generate a Fernet key if we need to create a new config file
+if not os.path.isfile(TEST_CONFIG_FILE) or not os.path.isfile(AIRFLOW_CONFIG):
+    FERNET_KEY = generate_fernet_key()
 if not os.path.isfile(TEST_CONFIG_FILE):
-    logging.info("Creating new airflow config file for unit tests in: " +
-                 TEST_CONFIG_FILE)
+    logging.info(
+        'Creating new Airflow config file for unit tests in: {}'.format(
+            TEST_CONFIG_FILE))
     with open(TEST_CONFIG_FILE, 'w') as f:
         f.write(parameterized_config(TEST_CONFIG))
 
@@ -789,7 +792,8 @@ if not os.path.isfile(AIRFLOW_CONFIG):
     # These configuration options are used to generate a default configuration
     # when it is missing. The right way to change your configuration is to
     # alter your configuration file, not this code.
-    logging.info("Creating new airflow config file in: " + AIRFLOW_CONFIG)
+    logging.info('Creating new Airflow config file in: {}'.format(
+        AIRFLOW_CONFIG))
     with open(AIRFLOW_CONFIG, 'w') as f:
         f.write(parameterized_config(DEFAULT_CONFIG))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/996dd309/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 561b002..f2d955b 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 from __future__ import absolute_import
 from __future__ import division
 from __future__ import print_function
@@ -44,7 +45,6 @@ import textwrap
 import traceback
 import warnings
 import hashlib
-
 from urllib.parse import urlparse
 
 from sqlalchemy import (
@@ -66,6 +66,7 @@ from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
+
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
 from airflow.utils.dates import cron_presets, date_range as utils_date_range
 from airflow.utils.db import provide_session
@@ -85,13 +86,17 @@ XCOM_RETURN_KEY = 'return_value'
 
 Stats = settings.Stats
 
-ENCRYPTION_ON = False
-try:
+
+def get_fernet():
+    """
+    Deferred load of Fernet key.
+
+    This function could fail either because Cryptography is not installed
+    or because the Fernet key is invalid.
+    """
     from cryptography.fernet import Fernet
-    FERNET = Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8'))
-    ENCRYPTION_ON = True
-except:
-    pass
+    return Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8'))
+
 
 if 'mysql' in settings.SQL_ALCHEMY_CONN:
     LongText = LONGTEXT
@@ -572,18 +577,21 @@ class Connection(Base):
 
     def get_password(self):
         if self._password and self.is_encrypted:
-            if not ENCRYPTION_ON:
+            try:
+                fernet = get_fernet()
+            except:
                 raise AirflowException(
                     "Can't decrypt encrypted password for login={}, \
                     FERNET_KEY configuration is missing".format(self.login))
-            return FERNET.decrypt(bytes(self._password, 'utf-8')).decode()
+            return fernet.decrypt(bytes(self._password, 'utf-8')).decode()
         else:
             return self._password
 
     def set_password(self, value):
         if value:
             try:
-                self._password = FERNET.encrypt(bytes(value, 'utf-8')).decode()
+                fernet = get_fernet()
+                self._password = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_encrypted = True
             except NameError:
                 self._password = value
@@ -596,18 +604,21 @@ class Connection(Base):
 
     def get_extra(self):
         if self._extra and self.is_extra_encrypted:
-            if not ENCRYPTION_ON:
+            try:
+                fernet = get_fernet()
+            except:
                 raise AirflowException(
                     "Can't decrypt `extra` params for login={},\
                     FERNET_KEY configuration is missing".format(self.login))
-            return FERNET.decrypt(bytes(self._extra, 'utf-8')).decode()
+            return fernet.decrypt(bytes(self._extra, 'utf-8')).decode()
         else:
             return self._extra
 
     def set_extra(self, value):
         if value:
             try:
-                self._extra = FERNET.encrypt(bytes(value, 'utf-8')).decode()
+                fernet = get_fernet()
+                self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_extra_encrypted = True
             except NameError:
                 self._extra = value
@@ -3556,18 +3567,21 @@ class Variable(Base):
 
     def get_val(self):
         if self._val and self.is_encrypted:
-            if not ENCRYPTION_ON:
+            try:
+                fernet = get_fernet()
+            except:
                 raise AirflowException(
                     "Can't decrypt _val for key={}, FERNET_KEY configuration \
                     missing".format(self.key))
-            return FERNET.decrypt(bytes(self._val, 'utf-8')).decode()
+            return fernet.decrypt(bytes(self._val, 'utf-8')).decode()
         else:
             return self._val
 
     def set_val(self, value):
         if value:
             try:
-                self._val = FERNET.encrypt(bytes(value, 'utf-8')).decode()
+                fernet = get_fernet()
+                self._val = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_encrypted = True
             except NameError:
                 self._val = value

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/996dd309/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 977a949..49a8d62 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -22,16 +22,11 @@ from functools import wraps
 import logging
 import os
 
-from alembic.config import Config
-from alembic import command
-from alembic.migration import MigrationContext
-
 from sqlalchemy import event, exc
 from sqlalchemy.pool import Pool
 
 from airflow import settings
 
-
 def provide_session(func):
     """
     Function decorator that provides a session if it isn't provided.
@@ -284,6 +279,10 @@ def initdb():
 
 
 def upgradedb():
+    # alembic adds significant import time, so we import it lazily
+    from alembic import command
+    from alembic.config import Config
+
     logging.info("Creating tables")
     current_dir = os.path.dirname(os.path.abspath(__file__))
     package_dir = os.path.normpath(os.path.join(current_dir, '..'))
@@ -299,6 +298,8 @@ def resetdb():
     Clear out the database
     '''
     from airflow import models
+    # alembic adds significant import time, so we import it lazily
+    from alembic.migration import MigrationContext
 
     logging.info("Dropping tables that exist")
     models.Base.metadata.drop_all(settings.engine)


[3/3] incubator-airflow git commit: Merge pull request #2166 from jlowin/speedup

Posted by xu...@apache.org.
Merge pull request #2166 from jlowin/speedup


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/23a16f7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/23a16f7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/23a16f7a

Branch: refs/heads/master
Commit: 23a16f7ad329830016b5aac68fbc27fd37a6fe3b
Parents: 7d11444 1da7450
Author: Li Xuanji <xu...@gmail.com>
Authored: Sun Mar 19 11:20:30 2017 -0400
Committer: Li Xuanji <xu...@gmail.com>
Committed: Sun Mar 19 11:20:30 2017 -0400

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg | 386 +++++++++++++++++
 airflow/config_templates/default_test.cfg    |  86 ++++
 airflow/configuration.py                     | 484 ++--------------------
 airflow/models.py                            |  46 +-
 airflow/utils/db.py                          |  11 +-
 5 files changed, 540 insertions(+), 473 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-airflow git commit: [AIRFLOW-1006] Move config templates to separate files

Posted by xu...@apache.org.
[AIRFLOW-1006] Move config templates to separate files

Config templates are easier to work with as standalone
files instead of giant strings inside configuration.py


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1da7450c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1da7450c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1da7450c

Branch: refs/heads/master
Commit: 1da7450c96c631a75da96ba00f6d3ad116c9061b
Parents: 996dd30
Author: Jeremiah Lowin <jl...@apache.org>
Authored: Fri Mar 17 18:41:44 2017 -0400
Committer: Jeremiah Lowin <jl...@apache.org>
Committed: Sun Mar 19 10:06:38 2017 -0400

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg | 386 ++++++++++++++++++
 airflow/config_templates/default_test.cfg    |  86 ++++
 airflow/configuration.py                     | 458 +---------------------
 3 files changed, 489 insertions(+), 441 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1da7450c/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
new file mode 100644
index 0000000..af240a7
--- /dev/null
+++ b/airflow/config_templates/default_airflow.cfg
@@ -0,0 +1,386 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This is the template for Airflow's default configuration. When Airflow is
+# imported, it looks for a configuration file at $AIRFLOW_HOME/airflow.cfg. If
+# it doesn't exist, Airflow uses this template to generate it by replacing
+# variables in curly braces with their global values from configuration.py.
+
+# Users should not modify this file; they should customize the generated
+# airflow.cfg instead.
+
+
+# ----------------------- TEMPLATE BEGINS HERE -----------------------
+
+[core]
+# The home folder for airflow, default is ~/airflow
+airflow_home = {AIRFLOW_HOME}
+
+# The folder where your airflow pipelines live, most likely a
+# subfolder in a code repository
+# This path must be absolute
+dags_folder = {AIRFLOW_HOME}/dags
+
+# The folder where airflow should store its log files
+# This path must be absolute
+base_log_folder = {AIRFLOW_HOME}/logs
+
+# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
+# must supply a remote location URL (starting with either 's3://...' or
+# 'gs://...') and an Airflow connection id that provides access to the storage
+# location.
+remote_base_log_folder =
+remote_log_conn_id =
+# Use server-side encryption for logs stored in S3
+encrypt_s3_logs = False
+# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
+s3_log_folder =
+
+# The executor class that airflow should use. Choices include
+# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
+executor = SequentialExecutor
+
+# The SqlAlchemy connection string to the metadata database.
+# SqlAlchemy supports many different database engine, more information
+# their website
+sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db
+
+# The SqlAlchemy pool size is the maximum number of database connections
+# in the pool.
+sql_alchemy_pool_size = 5
+
+# The SqlAlchemy pool recycle is the number of seconds a connection
+# can be idle in the pool before it is invalidated. This config does
+# not apply to sqlite.
+sql_alchemy_pool_recycle = 3600
+
+# The amount of parallelism as a setting to the executor. This defines
+# the max number of task instances that should run simultaneously
+# on this airflow installation
+parallelism = 32
+
+# The number of task instances allowed to run concurrently by the scheduler
+dag_concurrency = 16
+
+# Are DAGs paused by default at creation
+dags_are_paused_at_creation = True
+
+# When not using pools, tasks are run in the "default pool",
+# whose size is guided by this config element
+non_pooled_task_slot_count = 128
+
+# The maximum number of active DAG runs per DAG
+max_active_runs_per_dag = 16
+
+# Whether to load the examples that ship with Airflow. It's good to
+# get started, but you probably want to set this to False in a production
+# environment
+load_examples = True
+
+# Where your Airflow plugins are stored
+plugins_folder = {AIRFLOW_HOME}/plugins
+
+# Secret key to save connection passwords in the db
+fernet_key = {FERNET_KEY}
+
+# Whether to disable pickling dags
+donot_pickle = False
+
+# How long before timing out a python file import while filling the DagBag
+dagbag_import_timeout = 30
+
+# The class to use for running task instances in a subprocess
+task_runner = BashTaskRunner
+
+# If set, tasks without a `run_as_user` argument will be run with this user
+# Can be used to de-elevate a sudo user running Airflow when executing tasks
+default_impersonation =
+
+# What security module to use (for example kerberos):
+security =
+
+# Turn unit test mode on (overwrites many configuration options with test
+# values at runtime)
+unit_test_mode = False
+
+[cli]
+# In what way should the cli access the API. The LocalClient will use the
+# database directly, while the json_client will use the api running on the
+# webserver
+api_client = airflow.api.client.local_client
+endpoint_url = http://localhost:8080
+
+[api]
+# How to authenticate users of the API
+auth_backend = airflow.api.auth.backend.default
+
+[operators]
+# The default owner assigned to each new operator, unless
+# provided explicitly or passed via `default_args`
+default_owner = Airflow
+default_cpus = 1
+default_ram = 512
+default_disk = 512
+default_gpus = 0
+
+
+[webserver]
+# The base url of your website as airflow cannot guess what domain or
+# cname you are using. This is used in automated emails that
+# airflow sends to point links to the right web server
+base_url = http://localhost:8080
+
+# The ip specified when starting the web server
+web_server_host = 0.0.0.0
+
+# The port on which to run the web server
+web_server_port = 8080
+
+# Paths to the SSL certificate and key for the web server. When both are
+# provided SSL will be enabled. This does not change the web server port.
+web_server_ssl_cert =
+web_server_ssl_key =
+
+# Number of seconds the gunicorn webserver waits before timing out on a worker
+web_server_worker_timeout = 120
+
+# Number of workers to refresh at a time. When set to 0, worker refresh is
+# disabled. When nonzero, airflow periodically refreshes webserver workers by
+# bringing up new ones and killing old ones.
+worker_refresh_batch_size = 1
+
+# Number of seconds to wait before refreshing a batch of workers.
+worker_refresh_interval = 30
+
+# Secret key used to run your flask app
+secret_key = temporary_key
+
+# Number of workers to run the Gunicorn web server
+workers = 4
+
+# The worker class gunicorn should use. Choices include
+# sync (default), eventlet, gevent
+worker_class = sync
+
+# Log files for the gunicorn webserver. '-' means log to stderr.
+access_logfile = -
+error_logfile = -
+
+# Expose the configuration file in the web server
+expose_config = False
+
+# Set to true to turn on authentication:
+# http://pythonhosted.org/airflow/security.html#web-authentication
+authenticate = False
+
+# Filter the list of dags by owner name (requires authentication to be enabled)
+filter_by_owner = False
+
+# Filtering mode. Choices include user (default) and ldapgroup.
+# Ldap group filtering requires using the ldap backend
+#
+# Note that the ldap server needs the "memberOf" overlay to be set up
+# in order to user the ldapgroup mode.
+owner_mode = user
+
+# Default DAG view.  Valid values are:
+# tree, graph, duration, gantt, landing_times
+dag_default_view = tree
+
+# Default DAG orientation. Valid values are:
+# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
+dag_orientation = LR
+
+# Puts the webserver in demonstration mode; blurs the names of Operators for
+# privacy.
+demo_mode = False
+
+# The amount of time (in secs) webserver will wait for initial handshake
+# while fetching logs from other worker machine
+log_fetch_timeout_sec = 5
+
+# By default, the webserver shows paused DAGs. Flip this to hide paused
+# DAGs by default
+hide_paused_dags_by_default = False
+
+[email]
+email_backend = airflow.utils.email.send_email_smtp
+
+
+[smtp]
+# If you want airflow to send emails on retries, failure, and you want to use
+# the airflow.utils.email.send_email_smtp function, you have to configure an
+# smtp server here
+smtp_host = localhost
+smtp_starttls = True
+smtp_ssl = False
+# Uncomment and set the user/pass settings if you want to use SMTP AUTH
+# smtp_user = airflow
+# smtp_password = airflow
+smtp_port = 25
+smtp_mail_from = airflow@airflow.com
+
+
+[celery]
+# This section only applies if you are using the CeleryExecutor in
+# [core] section above
+
+# The app name that will be used by celery
+celery_app_name = airflow.executors.celery_executor
+
+# The concurrency that will be used when starting workers with the
+# "airflow worker" command. This defines the number of task instances that
+# a worker will take, so size up your workers based on the resources on
+# your worker box and the nature of your tasks
+celeryd_concurrency = 16
+
+# When you start an airflow worker, airflow starts a tiny web server
+# subprocess to serve the workers local log files to the airflow main
+# web server, who then builds pages and sends them to users. This defines
+# the port on which the logs are served. It needs to be unused, and open
+# visible from the main web server to connect into the workers.
+worker_log_server_port = 8793
+
+# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
+# a sqlalchemy database. Refer to the Celery documentation for more
+# information.
+broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
+
+# Another key Celery setting
+celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
+
+# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
+# it `airflow flower`. This defines the IP that Celery Flower runs on
+flower_host = 0.0.0.0
+
+# This defines the port that Celery Flower runs on
+flower_port = 5555
+
+# Default queue that tasks get assigned to and that worker listen on.
+default_queue = default
+
+
+[dask]
+# This section only applies if you are using the DaskExecutor in
+# [core] section above
+
+# The IP address and port of the Dask cluster's scheduler.
+cluster_address = 127.0.0.1:8786
+
+
+[scheduler]
+# Task instances listen for external kill signal (when you clear tasks
+# from the CLI or the UI), this defines the frequency at which they should
+# listen (in seconds).
+job_heartbeat_sec = 5
+
+# The scheduler constantly tries to trigger new tasks (look at the
+# scheduler section in the docs for more information). This defines
+# how often the scheduler should run (in seconds).
+scheduler_heartbeat_sec = 5
+
+# after how much time should the scheduler terminate in seconds
+# -1 indicates to run continuously (see also num_runs)
+run_duration = -1
+
+# after how much time a new DAGs should be picked up from the filesystem
+min_file_process_interval = 0
+
+dag_dir_list_interval = 300
+
+# How often should stats be printed to the logs
+print_stats_interval = 30
+
+child_process_log_directory = {AIRFLOW_HOME}/logs/scheduler
+
+# Local task jobs periodically heartbeat to the DB. If the job has
+# not heartbeat in this many seconds, the scheduler will mark the
+# associated task instance as failed and will re-schedule the task.
+scheduler_zombie_task_threshold = 300
+
+# Turn off scheduler catchup by setting this to False.
+# Default behavior is unchanged and
+# Command Line Backfills still work, but the scheduler
+# will not do scheduler catchup if this is False,
+# however it can be set on a per DAG basis in the
+# DAG definition (catchup)
+catchup_by_default = True
+
+# Statsd (https://github.com/etsy/statsd) integration settings
+statsd_on = False
+statsd_host = localhost
+statsd_port = 8125
+statsd_prefix = airflow
+
+# The scheduler can run multiple threads in parallel to schedule dags.
+# This defines how many threads will run. However airflow will never
+# use more threads than the amount of cpu cores available.
+max_threads = 2
+
+authenticate = False
+
+
+[mesos]
+# Mesos master address which MesosExecutor will connect to.
+master = localhost:5050
+
+# The framework name which Airflow scheduler will register itself as on mesos
+framework_name = Airflow
+
+# Number of cpu cores required for running one task instance using
+# 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
+# command on a mesos slave
+task_cpu = 1
+
+# Memory in MB required for running one task instance using
+# 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
+# command on a mesos slave
+task_memory = 256
+
+# Enable framework checkpointing for mesos
+# See http://mesos.apache.org/documentation/latest/slave-recovery/
+checkpoint = False
+
+# Failover timeout in milliseconds.
+# When checkpointing is enabled and this option is set, Mesos waits
+# until the configured timeout for
+# the MesosExecutor framework to re-register after a failover. Mesos
+# shuts down running tasks if the
+# MesosExecutor framework fails to re-register within this timeframe.
+# failover_timeout = 604800
+
+# Enable framework authentication for mesos
+# See http://mesos.apache.org/documentation/latest/configuration/
+authenticate = False
+
+# Mesos credentials, if authentication is enabled
+# default_principal = admin
+# default_secret = admin
+
+
+[kerberos]
+ccache = /tmp/airflow_krb5_ccache
+# gets augmented with fqdn
+principal = airflow
+reinit_frequency = 3600
+kinit_path = kinit
+keytab = airflow.keytab
+
+
+[github_enterprise]
+api_rev = v3
+
+
+[admin]
+# UI to hide sensitive variable fields when set to True
+hide_sensitive_variable_fields = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1da7450c/airflow/config_templates/default_test.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
new file mode 100644
index 0000000..ecf7f4e
--- /dev/null
+++ b/airflow/config_templates/default_test.cfg
@@ -0,0 +1,86 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This is the template for Airflow's unit test configuration. When Airflow runs
+# unit tests, it looks for a configuration file at $AIRFLOW_HOME/unittests.cfg.
+# If it doesn't exist, Airflow uses this template to generate it by replacing
+# variables in curly braces with their global values from configuration.py.
+
+# Users should not modify this file; they should customize the generated
+# unittests.cfg instead.
+
+
+# ----------------------- TEMPLATE BEGINS HERE -----------------------
+
+[core]
+unit_test_mode = True
+airflow_home = {AIRFLOW_HOME}
+dags_folder = {TEST_DAGS_FOLDER}
+plugins_folder = {TEST_PLUGINS_FOLDER}
+base_log_folder = {AIRFLOW_HOME}/logs
+executor = SequentialExecutor
+sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
+load_examples = True
+donot_pickle = False
+dag_concurrency = 16
+dags_are_paused_at_creation = False
+fernet_key = {FERNET_KEY}
+non_pooled_task_slot_count = 128
+
+[cli]
+api_client = airflow.api.client.local_client
+endpoint_url = http://localhost:8080
+
+[api]
+auth_backend = airflow.api.auth.backend.default
+
+[operators]
+default_owner = airflow
+
+[webserver]
+base_url = http://localhost:8080
+web_server_host = 0.0.0.0
+web_server_port = 8080
+dag_orientation = LR
+dag_default_view = tree
+log_fetch_timeout_sec = 5
+hide_paused_dags_by_default = False
+
+[email]
+email_backend = airflow.utils.email.send_email_smtp
+
+[smtp]
+smtp_host = localhost
+smtp_user = airflow
+smtp_port = 25
+smtp_password = airflow
+smtp_mail_from = airflow@airflow.com
+
+[celery]
+celery_app_name = airflow.executors.celery_executor
+celeryd_concurrency = 16
+worker_log_server_port = 8793
+broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
+celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
+flower_host = 0.0.0.0
+flower_port = 5555
+default_queue = default
+
+[scheduler]
+job_heartbeat_sec = 1
+scheduler_heartbeat_sec = 5
+authenticate = true
+max_threads = 2
+catchup_by_default = True
+scheduler_zombie_task_threshold = 300
+dag_dir_list_interval = 0

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1da7450c/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 9c7a03e..f140be2 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -34,7 +34,7 @@ from builtins import str
 from collections import OrderedDict
 from configparser import ConfigParser
 
-from .exceptions import AirflowConfigException
+from airflow.exceptions import AirflowConfigException
 
 # show Airflow's deprecation warnings
 warnings.filterwarnings(
@@ -88,437 +88,11 @@ def run_command(command):
 
     return output
 
-
-DEFAULT_CONFIG = """\
-[core]
-# The home folder for airflow, default is ~/airflow
-airflow_home = {AIRFLOW_HOME}
-
-# The folder where your airflow pipelines live, most likely a
-# subfolder in a code repository
-# This path must be absolute
-dags_folder = {AIRFLOW_HOME}/dags
-
-# The folder where airflow should store its log files
-# This path must be absolute
-base_log_folder = {AIRFLOW_HOME}/logs
-
-# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
-# must supply a remote location URL (starting with either 's3://...' or
-# 'gs://...') and an Airflow connection id that provides access to the storage
-# location.
-remote_base_log_folder =
-remote_log_conn_id =
-# Use server-side encryption for logs stored in S3
-encrypt_s3_logs = False
-# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
-s3_log_folder =
-
-# The executor class that airflow should use. Choices include
-# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
-executor = SequentialExecutor
-
-# The SqlAlchemy connection string to the metadata database.
-# SqlAlchemy supports many different database engine, more information
-# their website
-sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db
-
-# The SqlAlchemy pool size is the maximum number of database connections
-# in the pool.
-sql_alchemy_pool_size = 5
-
-# The SqlAlchemy pool recycle is the number of seconds a connection
-# can be idle in the pool before it is invalidated. This config does
-# not apply to sqlite.
-sql_alchemy_pool_recycle = 3600
-
-# The amount of parallelism as a setting to the executor. This defines
-# the max number of task instances that should run simultaneously
-# on this airflow installation
-parallelism = 32
-
-# The number of task instances allowed to run concurrently by the scheduler
-dag_concurrency = 16
-
-# Are DAGs paused by default at creation
-dags_are_paused_at_creation = True
-
-# When not using pools, tasks are run in the "default pool",
-# whose size is guided by this config element
-non_pooled_task_slot_count = 128
-
-# The maximum number of active DAG runs per DAG
-max_active_runs_per_dag = 16
-
-# Whether to load the examples that ship with Airflow. It's good to
-# get started, but you probably want to set this to False in a production
-# environment
-load_examples = True
-
-# Where your Airflow plugins are stored
-plugins_folder = {AIRFLOW_HOME}/plugins
-
-# Secret key to save connection passwords in the db
-fernet_key = {FERNET_KEY}
-
-# Whether to disable pickling dags
-donot_pickle = False
-
-# How long before timing out a python file import while filling the DagBag
-dagbag_import_timeout = 30
-
-# The class to use for running task instances in a subprocess
-task_runner = BashTaskRunner
-
-# If set, tasks without a `run_as_user` argument will be run with this user
-# Can be used to de-elevate a sudo user running Airflow when executing tasks
-default_impersonation =
-
-# What security module to use (for example kerberos):
-security =
-
-# Turn unit test mode on (overwrites many configuration options with test
-# values at runtime)
-unit_test_mode = False
-
-[cli]
-# In what way should the cli access the API. The LocalClient will use the
-# database directly, while the json_client will use the api running on the
-# webserver
-api_client = airflow.api.client.local_client
-endpoint_url = http://localhost:8080
-
-[api]
-# How to authenticate users of the API
-auth_backend = airflow.api.auth.backend.default
-
-[operators]
-# The default owner assigned to each new operator, unless
-# provided explicitly or passed via `default_args`
-default_owner = Airflow
-default_cpus = 1
-default_ram = 512
-default_disk = 512
-default_gpus = 0
-
-
-[webserver]
-# The base url of your website as airflow cannot guess what domain or
-# cname you are using. This is used in automated emails that
-# airflow sends to point links to the right web server
-base_url = http://localhost:8080
-
-# The ip specified when starting the web server
-web_server_host = 0.0.0.0
-
-# The port on which to run the web server
-web_server_port = 8080
-
-# Paths to the SSL certificate and key for the web server. When both are
-# provided SSL will be enabled. This does not change the web server port.
-web_server_ssl_cert =
-web_server_ssl_key =
-
-# Number of seconds the gunicorn webserver waits before timing out on a worker
-web_server_worker_timeout = 120
-
-# Number of workers to refresh at a time. When set to 0, worker refresh is
-# disabled. When nonzero, airflow periodically refreshes webserver workers by
-# bringing up new ones and killing old ones.
-worker_refresh_batch_size = 1
-
-# Number of seconds to wait before refreshing a batch of workers.
-worker_refresh_interval = 30
-
-# Secret key used to run your flask app
-secret_key = temporary_key
-
-# Number of workers to run the Gunicorn web server
-workers = 4
-
-# The worker class gunicorn should use. Choices include
-# sync (default), eventlet, gevent
-worker_class = sync
-
-# Log files for the gunicorn webserver. '-' means log to stderr.
-access_logfile = -
-error_logfile = -
-
-# Expose the configuration file in the web server
-expose_config = False
-
-# Set to true to turn on authentication:
-# http://pythonhosted.org/airflow/security.html#web-authentication
-authenticate = False
-
-# Filter the list of dags by owner name (requires authentication to be enabled)
-filter_by_owner = False
-
-# Filtering mode. Choices include user (default) and ldapgroup.
-# Ldap group filtering requires using the ldap backend
-#
-# Note that the ldap server needs the "memberOf" overlay to be set up
-# in order to user the ldapgroup mode.
-owner_mode = user
-
-# Default DAG view.  Valid values are:
-# tree, graph, duration, gantt, landing_times
-dag_default_view = tree
-
-# Default DAG orientation. Valid values are:
-# LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
-dag_orientation = LR
-
-# Puts the webserver in demonstration mode; blurs the names of Operators for
-# privacy.
-demo_mode = False
-
-# The amount of time (in secs) webserver will wait for initial handshake
-# while fetching logs from other worker machine
-log_fetch_timeout_sec = 5
-
-# By default, the webserver shows paused DAGs. Flip this to hide paused
-# DAGs by default
-hide_paused_dags_by_default = False
-
-[email]
-email_backend = airflow.utils.email.send_email_smtp
-
-
-[smtp]
-# If you want airflow to send emails on retries, failure, and you want to use
-# the airflow.utils.email.send_email_smtp function, you have to configure an
-# smtp server here
-smtp_host = localhost
-smtp_starttls = True
-smtp_ssl = False
-# Uncomment and set the user/pass settings if you want to use SMTP AUTH
-# smtp_user = airflow
-# smtp_password = airflow
-smtp_port = 25
-smtp_mail_from = airflow@airflow.com
-
-
-[celery]
-# This section only applies if you are using the CeleryExecutor in
-# [core] section above
-
-# The app name that will be used by celery
-celery_app_name = airflow.executors.celery_executor
-
-# The concurrency that will be used when starting workers with the
-# "airflow worker" command. This defines the number of task instances that
-# a worker will take, so size up your workers based on the resources on
-# your worker box and the nature of your tasks
-celeryd_concurrency = 16
-
-# When you start an airflow worker, airflow starts a tiny web server
-# subprocess to serve the workers local log files to the airflow main
-# web server, who then builds pages and sends them to users. This defines
-# the port on which the logs are served. It needs to be unused, and open
-# visible from the main web server to connect into the workers.
-worker_log_server_port = 8793
-
-# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
-# a sqlalchemy database. Refer to the Celery documentation for more
-# information.
-broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
-
-# Another key Celery setting
-celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
-
-# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
-# it `airflow flower`. This defines the IP that Celery Flower runs on
-flower_host = 0.0.0.0
-
-# This defines the port that Celery Flower runs on
-flower_port = 5555
-
-# Default queue that tasks get assigned to and that worker listen on.
-default_queue = default
-
-
-[dask]
-# This section only applies if you are using the DaskExecutor in
-# [core] section above
-
-# The IP address and port of the Dask cluster's scheduler.
-cluster_address = 127.0.0.1:8786
-
-
-[scheduler]
-# Task instances listen for external kill signal (when you clear tasks
-# from the CLI or the UI), this defines the frequency at which they should
-# listen (in seconds).
-job_heartbeat_sec = 5
-
-# The scheduler constantly tries to trigger new tasks (look at the
-# scheduler section in the docs for more information). This defines
-# how often the scheduler should run (in seconds).
-scheduler_heartbeat_sec = 5
-
-# after how much time should the scheduler terminate in seconds
-# -1 indicates to run continuously (see also num_runs)
-run_duration = -1
-
-# after how much time a new DAGs should be picked up from the filesystem
-min_file_process_interval = 0
-
-dag_dir_list_interval = 300
-
-# How often should stats be printed to the logs
-print_stats_interval = 30
-
-child_process_log_directory = {AIRFLOW_HOME}/logs/scheduler
-
-# Local task jobs periodically heartbeat to the DB. If the job has
-# not heartbeat in this many seconds, the scheduler will mark the
-# associated task instance as failed and will re-schedule the task.
-scheduler_zombie_task_threshold = 300
-
-# Turn off scheduler catchup by setting this to False.
-# Default behavior is unchanged and
-# Command Line Backfills still work, but the scheduler
-# will not do scheduler catchup if this is False,
-# however it can be set on a per DAG basis in the
-# DAG definition (catchup)
-catchup_by_default = True
-
-# Statsd (https://github.com/etsy/statsd) integration settings
-statsd_on = False
-statsd_host = localhost
-statsd_port = 8125
-statsd_prefix = airflow
-
-# The scheduler can run multiple threads in parallel to schedule dags.
-# This defines how many threads will run. However airflow will never
-# use more threads than the amount of cpu cores available.
-max_threads = 2
-
-authenticate = False
-
-
-[mesos]
-# Mesos master address which MesosExecutor will connect to.
-master = localhost:5050
-
-# The framework name which Airflow scheduler will register itself as on mesos
-framework_name = Airflow
-
-# Number of cpu cores required for running one task instance using
-# 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
-# command on a mesos slave
-task_cpu = 1
-
-# Memory in MB required for running one task instance using
-# 'airflow run <dag_id> <task_id> <execution_date> --local -p <pickle_id>'
-# command on a mesos slave
-task_memory = 256
-
-# Enable framework checkpointing for mesos
-# See http://mesos.apache.org/documentation/latest/slave-recovery/
-checkpoint = False
-
-# Failover timeout in milliseconds.
-# When checkpointing is enabled and this option is set, Mesos waits
-# until the configured timeout for
-# the MesosExecutor framework to re-register after a failover. Mesos
-# shuts down running tasks if the
-# MesosExecutor framework fails to re-register within this timeframe.
-# failover_timeout = 604800
-
-# Enable framework authentication for mesos
-# See http://mesos.apache.org/documentation/latest/configuration/
-authenticate = False
-
-# Mesos credentials, if authentication is enabled
-# default_principal = admin
-# default_secret = admin
-
-
-[kerberos]
-ccache = /tmp/airflow_krb5_ccache
-# gets augmented with fqdn
-principal = airflow
-reinit_frequency = 3600
-kinit_path = kinit
-keytab = airflow.keytab
-
-
-[github_enterprise]
-api_rev = v3
-
-
-[admin]
-# UI to hide sensitive variable fields when set to True
-hide_sensitive_variable_fields = True
-
-"""
-
-TEST_CONFIG = """\
-[core]
-unit_test_mode = True
-airflow_home = {AIRFLOW_HOME}
-dags_folder = {TEST_DAGS_FOLDER}
-plugins_folder = {TEST_PLUGINS_FOLDER}
-base_log_folder = {AIRFLOW_HOME}/logs
-executor = SequentialExecutor
-sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
-load_examples = True
-donot_pickle = False
-dag_concurrency = 16
-dags_are_paused_at_creation = False
-fernet_key = {FERNET_KEY}
-non_pooled_task_slot_count = 128
-
-[cli]
-api_client = airflow.api.client.local_client
-endpoint_url = http://localhost:8080
-
-[api]
-auth_backend = airflow.api.auth.backend.default
-
-[operators]
-default_owner = airflow
-
-[webserver]
-base_url = http://localhost:8080
-web_server_host = 0.0.0.0
-web_server_port = 8080
-dag_orientation = LR
-dag_default_view = tree
-log_fetch_timeout_sec = 5
-hide_paused_dags_by_default = False
-
-[email]
-email_backend = airflow.utils.email.send_email_smtp
-
-[smtp]
-smtp_host = localhost
-smtp_user = airflow
-smtp_port = 25
-smtp_password = airflow
-smtp_mail_from = airflow@airflow.com
-
-[celery]
-celery_app_name = airflow.executors.celery_executor
-celeryd_concurrency = 16
-worker_log_server_port = 8793
-broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
-celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
-flower_host = 0.0.0.0
-flower_port = 5555
-default_queue = default
-
-[scheduler]
-job_heartbeat_sec = 1
-scheduler_heartbeat_sec = 5
-authenticate = true
-max_threads = 2
-catchup_by_default = True
-scheduler_zombie_task_threshold = 300
-dag_dir_list_interval = 0
-"""
+_templates_dir = os.path.join(os.path.dirname(__file__), 'config_templates')
+with open(os.path.join(_templates_dir, 'default_airflow.cfg')) as f:
+    DEFAULT_CONFIG = f.read()
+with open(os.path.join(_templates_dir, 'default_test.cfg')) as f:
+    TEST_CONFIG = f.read()
 
 
 class AirflowConfigParser(ConfigParser):
@@ -773,33 +347,35 @@ def parameterized_config(template):
     :param template: a config content templated with {{variables}}
     """
     all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
-    if 'FERNET_KEY' not in all_vars:
-        all_vars['FERNET_KEY'] = ''
     return template.format(**all_vars)
 
+
 TEST_CONFIG_FILE = AIRFLOW_HOME + '/unittests.cfg'
+
 # only generate a Fernet key if we need to create a new config file
 if not os.path.isfile(TEST_CONFIG_FILE) or not os.path.isfile(AIRFLOW_CONFIG):
     FERNET_KEY = generate_fernet_key()
+else:
+    FERNET_KEY = ''
+
+TEMPLATE_START = (
+    '# ----------------------- TEMPLATE BEGINS HERE -----------------------')
 if not os.path.isfile(TEST_CONFIG_FILE):
     logging.info(
         'Creating new Airflow config file for unit tests in: {}'.format(
             TEST_CONFIG_FILE))
     with open(TEST_CONFIG_FILE, 'w') as f:
-        f.write(parameterized_config(TEST_CONFIG))
-
+        cfg = parameterized_config(TEST_CONFIG)
+        f.write(cfg.split(TEMPLATE_START)[-1].strip())
 if not os.path.isfile(AIRFLOW_CONFIG):
-    # These configuration options are used to generate a default configuration
-    # when it is missing. The right way to change your configuration is to
-    # alter your configuration file, not this code.
     logging.info('Creating new Airflow config file in: {}'.format(
         AIRFLOW_CONFIG))
     with open(AIRFLOW_CONFIG, 'w') as f:
-        f.write(parameterized_config(DEFAULT_CONFIG))
+        cfg = parameterized_config(DEFAULT_CONFIG)
+        f.write(cfg.split(TEMPLATE_START)[-1].strip())
 
 logging.info("Reading the config from " + AIRFLOW_CONFIG)
 
-
 conf = AirflowConfigParser()
 conf.read(AIRFLOW_CONFIG)