You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/10/02 15:14:08 UTC

incubator-airflow git commit: [AIRFLOW-1611] Customize logging

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 05bdd7413 -> 3c3a65a3f


[AIRFLOW-1611] Customize logging

Change the configuration of the logging to make
use of the python
logging and make the configuration easy
configurable. Some of the
settings which are now not needed anymore since
they can easily
be implemented in the config file.

Closes #2631 from Fokko/AIRFLOW-1611-customize-
logging-in-airflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3c3a65a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3c3a65a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3c3a65a3

Branch: refs/heads/master
Commit: 3c3a65a3fe2edbd7eee1d736735194a1ca14962a
Parents: 05bdd74
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Mon Oct 2 17:14:01 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Oct 2 17:14:01 2017 +0200

----------------------------------------------------------------------
 UPDATING.md                                     | 126 ++++++++++-
 .../config_templates/airflow_local_settings.py  |  86 ++++++++
 airflow/config_templates/default_airflow.cfg    |  18 +-
 .../config_templates/default_airflow_logging.py |  94 --------
 airflow/jobs.py                                 |   5 +-
 airflow/logging_config.py                       |  64 ++++++
 airflow/settings.py                             |  53 +----
 airflow/www/app.py                              |   4 +-
 tests/core.py                                   |  48 -----
 tests/test_logging_config.py                    | 216 +++++++++++++++++++
 tests/utils/test_log_handlers.py                |  16 +-
 tests/www/test_views.py                         |  29 ++-
 12 files changed, 538 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index cde7141..0b21dd1 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -3,7 +3,7 @@
 This file documents any backwards-incompatible changes in Airflow and
 assists people when migrating to a new version.
 
-## Master
+## Airflow 1.9
 
 ### SSH Hook updates, along with new SSH Operator & SFTP Operator
   SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
@@ -13,9 +13,129 @@ assists people when migrating to a new version.
   - No updates are required if you are using ftpHook, it will continue work as is.
 
 ### Logging update
-Airflow's logging has been rewritten to uses Python’s builtin `logging` module to perform system logging. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. The main benefit that this brings to us is the easy configuration of the logging through `default_airflow_logging.py` and the ability to use different handlers for logging.
 
-Logs now are stored in the log folder as `{dag_id}/{task_id}/{execution_date}/{try_number}.log`.
+The logging structure of Airflow has been rewritten to make configuration easier and the logging system more transparent.
+
+#### A quick recap about logging
+ 
+A logger is the entry point into the logging system. Each logger is a named bucket to which messages can be written for processing. A logger is configured to have a log level. This log level describes the severity of the messages that the logger will handle. Python defines the following log levels: DEBUG, INFO, WARNING, ERROR or CRITICAL.
+
+Each message that is written to the logger is a Log Record. Each log record also has a log level indicating the severity of that specific message. A log record can also contain useful metadata that describes the event that is being logged. This can include details such as a stack trace or an error code.
+
+When a message is given to the logger, the log level of the message is compared to the log level of the logger. If the log level of the message meets or exceeds the log level of the logger itself, the message will undergo further processing. If it doesn’t, the message will be ignored.
+
+Once a logger has determined that a message needs to be processed, it is passed to a Handler. This configuration is now more flexible and can be easily be maintained in a single file.
+
+#### Changes in Airflow Logging
+
+Airflow's logging mechanism has been refactored to uses Python’s builtin `logging` module to perform logging of the application. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. Also the `BaseHook` and `BaseOperator` already extends this class, so it is easily available to do logging.
+
+The main benefit is easier configuration of the logging by setting a single centralized python file. Disclaimer; there is still some inline configuration, but this will be removed eventually. The new logging class is defined by setting the dotted classpath in your `~/airflow/airflow.cfg` file:
+
+```
+# Logging class
+# Specify the class that will specify the logging configuration
+# This class has to be on the python classpath
+logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
+```
+
+The logging configuration file that contains the configuration needs te on the  the `PYTHONPATH`, for example in `~/airflow/dags` or `~/airflow/plugins`. These directories are loaded by default, of course you are free to add a directory to the `PYTHONPATH`, this might be handy when you have the config in another directory or you mount a volume in case of Docker. As an example you can start from `airflow.config_templates.airflow_local_settings.LOGGING_CONFIG`:
+
+```
+LOGGING_CONFIG = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow.task': {
+            'format': LOG_FORMAT,
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'logging.StreamHandler',
+            'formatter': 'airflow.task',
+            'stream': 'ext://sys.stdout'
+        },
+        'file.task': {
+            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
+            'formatter': 'airflow.task',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'filename_template': FILENAME_TEMPLATE,
+        },
+        # When using s3 or gcs, provide a customized LOGGING_CONFIG
+        # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
+        # for details
+        's3.task': {
+            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
+            'formatter': 'airflow.task',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            's3_log_folder': S3_LOG_FOLDER,
+            'filename_template': FILENAME_TEMPLATE,
+        },
+        'gcs.task': {
+            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
+            'formatter': 'airflow.task',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'gcs_log_folder': GCS_LOG_FOLDER,
+            'filename_template': FILENAME_TEMPLATE,
+        },
+    },
+    'loggers': {
+        'airflow.task': {
+            'handlers': ['file.task'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+        'airflow.task_runner': {
+            'handlers': ['file.task'],
+            'level': LOG_LEVEL,
+            'propagate': True,
+        },
+        'airflow': {
+            'handlers': ['console'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+    }
+}
+```
+
+If you want to customize the logging (for example, use logging rotate), you can do this by defining one or more of the logging handles that [Python has to offer](https://docs.python.org/3/library/logging.handlers.html). For more details about the Python logging, please refer to the [official logging documentation](https://docs.python.org/3/library/logging.html).
+
+Furthermore, this change also simplifies logging within the DAG itself:
+
+```
+root@ae1bc863e815:/airflow# python
+Python 3.6.2 (default, Sep 13 2017, 14:26:54) 
+[GCC 4.9.2] on linux
+Type "help", "copyright", "credits" or "license" for more information.
+>>> from airflow.settings import *
+>>> 
+>>> from datetime import datetime
+>>> from airflow import DAG
+>>> from airflow.operators.dummy_operator import DummyOperator
+>>> 
+>>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1))
+>>> 
+>>> task = DummyOperator(task_id='task_1', dag=dag)
+>>> 
+>>> task.log.error('I want to say something..')
+[2017-09-25 20:17:04,927] {<stdin>:1} ERROR - I want to say something..
+```
+
+#### Template path of the file_task_handler
+
+The `file_task_handler` logger is more flexible. You can change the default format, `{dag_id}/{task_id}/{execution_date}/{try_number}.log` by supplying Jinja templating in the `FILENAME_TEMPLATE` configuration variable. See the `file_task_handler` for more information.
+
+#### I'm using S3Log or GCSLogs, what do I do!?
+
+IF you are logging to either S3Log or GCSLogs, you will need a custom logging config. The `REMOTE_BASE_LOG_FOLDER` configuration key in your airflow config has been removed, therefore you will need to take the following steps:
+ - Copy the logging configuration from [`airflow/config_templates/airflow_logging_settings.py`](https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py) and copy it. 
+ - Place it in a directory inside the Python import path `PYTHONPATH`. If you are using Python 2.7, ensuring that any `__init__.py` files exist so that it is importable.
+ - Update the config by setting the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The `REMOTE_BASE_LOG_FOLDER` key is not used anymore. 
+ - Set the `logging_config_class` to the filename and dict. For example, if you place `custom_logging_config.py` on the base of your pythonpath, you will need to set `logging_config_class = custom_logging_config.LOGGING_CONFIG` in your config as Airflow 1.8.
+ 
+ELSE you don't need to change anything. If there is no custom config, the airflow config loader will still default to the same config. 
 
 ### New Features
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/airflow/config_templates/airflow_local_settings.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
new file mode 100644
index 0000000..fe3337f
--- /dev/null
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from airflow import configuration as conf
+
+# TODO: Logging format and level should be configured
+# in this file instead of from airflow.cfg. Currently
+# there are other log format and level configurations in
+# settings.py and cli.py. Please see AIRFLOW-1455.
+
+LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
+LOG_FORMAT = conf.get('core', 'log_format')
+
+BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
+
+FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
+
+DEFAULT_LOGGING_CONFIG = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow.task': {
+            'format': LOG_FORMAT,
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'logging.StreamHandler',
+            'formatter': 'airflow.task',
+            'stream': 'ext://sys.stdout'
+        },
+        'file.task': {
+            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
+            'formatter': 'airflow.task',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'filename_template': FILENAME_TEMPLATE,
+        }
+        # When using s3 or gcs, provide a customized LOGGING_CONFIG
+        # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
+        # for details
+        # 's3.task': {
+        #     'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
+        #     'formatter': 'airflow.task',
+        #     'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+        #     's3_log_folder': S3_LOG_FOLDER,
+        #     'filename_template': FILENAME_TEMPLATE,
+        # },
+        # 'gcs.task': {
+        #     'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
+        #     'formatter': 'airflow.task',
+        #     'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+        #     'gcs_log_folder': GCS_LOG_FOLDER,
+        #     'filename_template': FILENAME_TEMPLATE,
+        # },
+    },
+    'loggers': {
+        'airflow.task': {
+            'handlers': ['file.task'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+        'airflow.task_runner': {
+            'handlers': ['file.task'],
+            'level': LOG_LEVEL,
+            'propagate': True,
+        },
+        'airflow': {
+            'handlers': ['console'],
+            'level': LOG_LEVEL,
+            'propagate': False,
+        },
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index ef586b8..b051583 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -36,23 +36,22 @@ dags_folder = {AIRFLOW_HOME}/dags
 base_log_folder = {AIRFLOW_HOME}/logs
 
 # Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
-# must supply a remote location URL (starting with either 's3://...' or
-# 'gs://...') and an Airflow connection id that provides access to the storage
+# must supply an Airflow connection id that provides access to the storage
 # location.
-remote_base_log_folder =
 remote_log_conn_id =
-# Use server-side encryption for logs stored in S3
 encrypt_s3_logs = False
-# DEPRECATED option for remote log storage, use remote_base_log_folder instead!
-s3_log_folder =
 
 # Logging level
 logging_level = INFO
 
+# Logging class
+# Specify the class that will specify the logging configuration
+# This class has to be on the python classpath
+# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
+logging_config_class =
+
 # Log format
 log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
-log_format_with_pid = [%%(asctime)s] [%%(process)d] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
-log_format_with_thread_name = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(threadName)s %%(levelname)s - %%(message)s
 simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
 
 # The executor class that airflow should use. Choices include
@@ -122,9 +121,6 @@ security =
 # values at runtime)
 unit_test_mode = False
 
-# User defined logging configuration file path.
-logging_config_path =
-
 # Name of handler to read task instance logs.
 # Default to use file task handler.
 task_log_reader = file.task

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/airflow/config_templates/default_airflow_logging.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow_logging.py b/airflow/config_templates/default_airflow_logging.py
deleted file mode 100644
index 523b4e8..0000000
--- a/airflow/config_templates/default_airflow_logging.py
+++ /dev/null
@@ -1,94 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-
-from airflow import configuration as conf
-
-# TODO: Logging format and level should be configured
-# in this file instead of from airflow.cfg. Currently
-# there are other log format and level configurations in
-# settings.py and cli.py. Please see AIRFLOW-1455.
-
-LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
-LOG_FORMAT = conf.get('core', 'log_format')
-
-BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
-
-# TODO: REMOTE_BASE_LOG_FOLDER should be deprecated and
-# directly specify in the handler definitions. This is to
-# provide compatibility to older remote log folder settings.
-REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
-S3_LOG_FOLDER = ''
-GCS_LOG_FOLDER = ''
-if REMOTE_BASE_LOG_FOLDER.startswith('s3:/'):
-    S3_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER
-elif REMOTE_BASE_LOG_FOLDER.startswith('gs:/'):
-    GCS_LOG_FOLDER = REMOTE_BASE_LOG_FOLDER
-
-FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
-
-DEFAULT_LOGGING_CONFIG = {
-    'version': 1,
-    'disable_existing_loggers': False,
-    'formatters': {
-        'airflow.task': {
-            'format': LOG_FORMAT,
-        },
-    },
-    'handlers': {
-        'console': {
-            'class': 'logging.StreamHandler',
-            'formatter': 'airflow.task',
-            'stream': 'ext://sys.stdout'
-        },
-        'file.task': {
-            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
-            'formatter': 'airflow.task',
-            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-            'filename_template': FILENAME_TEMPLATE,
-        },
-        's3.task': {
-            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
-            'formatter': 'airflow.task',
-            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-            's3_log_folder': S3_LOG_FOLDER,
-            'filename_template': FILENAME_TEMPLATE,
-        },
-        'gcs.task': {
-            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
-            'formatter': 'airflow.task',
-            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-            'gcs_log_folder': GCS_LOG_FOLDER,
-            'filename_template': FILENAME_TEMPLATE,
-        },
-    },
-    'loggers': {
-        'airflow.task': {
-            'handlers': ['file.task'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-        },
-        'airflow.task_runner': {
-            'handlers': ['file.task'],
-            'level': LOG_LEVEL,
-            'propagate': True,
-        },
-        'airflow.task.raw': {
-            'handlers': ['console'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-        },
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ce9cb0f..6f4cf97 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -40,6 +40,7 @@ from time import sleep
 from airflow import configuration as conf
 from airflow import executors, models, settings
 from airflow.exceptions import AirflowException
+from airflow.logging_config import configure_logging
 from airflow.models import DAG, DagRun
 from airflow.settings import Stats
 from airflow.task_runner import get_task_runner
@@ -372,9 +373,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
             sys.stderr = f
 
             try:
-                # Re-configure logging to use the new output streams
-                log_format = settings.LOG_FORMAT_WITH_THREAD_NAME
-                settings.configure_logging(log_format=log_format)
+                configure_logging()
                 # Re-configure the ORM engine as there are issues with multiple processes
                 settings.configure_orm()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/airflow/logging_config.py
----------------------------------------------------------------------
diff --git a/airflow/logging_config.py b/airflow/logging_config.py
new file mode 100644
index 0000000..d8c288a
--- /dev/null
+++ b/airflow/logging_config.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import sys
+from logging.config import dictConfig
+
+from airflow import configuration as conf
+from airflow.exceptions import AirflowConfigException
+from airflow.utils.module_loading import import_string
+
+log = logging.getLogger(__name__)
+
+
+def configure_logging():
+    logging_class_path = ''
+    try:
+        logging_class_path = conf.get('core', 'logging_config_class')
+    except AirflowConfigException:
+        log.debug('Could not find key logging_config_class in config')
+
+    if logging_class_path:
+        try:
+            logging_config = import_string(logging_class_path)
+
+            # Make sure that the variable is in scope
+            assert (isinstance(logging_config, dict))
+
+            log.info(
+                'Successfully imported user-defined logging config from %s',
+                logging_class_path
+            )
+        except Exception:
+            # Import default logging configurations.
+            raise ImportError(
+                'Unable to load custom logging from {}'.format(logging_class_path)
+            )
+    else:
+        from airflow.config_templates.airflow_local_settings import (
+            DEFAULT_LOGGING_CONFIG as logging_config
+        )
+        log.debug('Unable to load custom logging, using default config instead')
+
+    try:
+        # Try to init logging
+        dictConfig(logging_config)
+    except ValueError as e:
+        log.warning('Unable to load the config, contains a configuration error.')
+        # When there is an error in the config, escalate the exception
+        # otherwise Airflow would silently fall back on the default config
+        raise e
+
+    return logging_config

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 1e5e614..0dfbb15 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -18,21 +18,18 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 import logging
-import logging.config
 import os
-import sys
-
 from sqlalchemy import create_engine
 from sqlalchemy.orm import scoped_session, sessionmaker
 from sqlalchemy.pool import NullPool
 
 from airflow import configuration as conf
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.logging_config import configure_logging
 
-log = LoggingMixin().log
+log = logging.getLogger(__name__)
 
-class DummyStatsLogger(object):
 
+class DummyStatsLogger(object):
     @classmethod
     def incr(cls, stat, count=1, rate=1):
         pass
@@ -49,10 +46,12 @@ class DummyStatsLogger(object):
     def timing(cls, stat, dt):
         pass
 
+
 Stats = DummyStatsLogger
 
 if conf.getboolean('scheduler', 'statsd_on'):
     from statsd import StatsClient
+
     statsd = StatsClient(
         host=conf.get('scheduler', 'statsd_host'),
         port=conf.getint('scheduler', 'statsd_port'),
@@ -61,7 +60,6 @@ if conf.getboolean('scheduler', 'statsd_on'):
 else:
     Stats = DummyStatsLogger
 
-
 HEADER = """\
   ____________       _____________
  ____    |__( )_________  __/__  /________      __
@@ -77,8 +75,6 @@ LOGGING_LEVEL = logging.INFO
 GUNICORN_WORKER_READY_PREFIX = "[ready] "
 
 LOG_FORMAT = conf.get('core', 'log_format')
-LOG_FORMAT_WITH_PID = conf.get('core', 'log_format_with_pid')
-LOG_FORMAT_WITH_THREAD_NAME = conf.get('core', 'log_format_with_thread_name')
 SIMPLE_LOG_FORMAT = conf.get('core', 'simple_log_format')
 
 AIRFLOW_HOME = None
@@ -116,28 +112,6 @@ def policy(task_instance):
     pass
 
 
-def configure_logging(log_format=LOG_FORMAT):
-
-    def _configure_logging(logging_level):
-        global LOGGING_LEVEL
-        logging.root.handlers = []
-        logging.basicConfig(
-            format=log_format, stream=sys.stdout, level=logging_level)
-        LOGGING_LEVEL = logging_level
-
-    if "logging_level" in conf.as_dict()["core"]:
-        logging_level = conf.get('core', 'LOGGING_LEVEL').upper()
-    else:
-        logging_level = LOGGING_LEVEL
-    try:
-        _configure_logging(logging_level)
-    except ValueError:
-        logging.warning(
-            "Logging level %s is not defined. Use default.", logging_level
-        )
-        _configure_logging(logging.INFO)
-
-
 def configure_vars():
     global AIRFLOW_HOME
     global SQL_ALCHEMY_CONN
@@ -163,8 +137,10 @@ def configure_orm(disable_connection_pool=False):
     Session = scoped_session(
         sessionmaker(autocommit=False, autoflush=False, bind=engine))
 
+
 try:
     from airflow_local_settings import *
+
     log.info("Loaded airflow_local_settings.")
 except:
     pass
@@ -173,21 +149,6 @@ configure_logging()
 configure_vars()
 configure_orm()
 
-# TODO: Unify airflow logging setups. Please see AIRFLOW-1457.
-logging_config_path = conf.get('core', 'logging_config_path')
-try:
-    from logging_config_path import LOGGING_CONFIG
-    log.debug("Successfully imported user-defined logging config.")
-except Exception as e:
-    # Import default logging configurations.
-    log.debug(
-        "Unable to load custom logging config file: %s. Using default airflow logging config instead",
-        e
-    )
-    from airflow.config_templates.default_airflow_logging import \
-        DEFAULT_LOGGING_CONFIG as LOGGING_CONFIG
-logging.config.dictConfig(LOGGING_CONFIG)
-
 # Const stuff
 
 KILOBYTE = 1024

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/airflow/www/app.py
----------------------------------------------------------------------
diff --git a/airflow/www/app.py b/airflow/www/app.py
index 438a1e2..bbb9410 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -26,6 +26,7 @@ from airflow import models, LoggingMixin
 from airflow.settings import Session
 
 from airflow.www.blueprints import routes
+from airflow.logging_config import configure_logging
 from airflow import jobs
 from airflow import settings
 from airflow import configuration
@@ -52,8 +53,7 @@ def create_app(config=None, testing=False):
 
     app.register_blueprint(routes)
 
-    log_format = airflow.settings.LOG_FORMAT_WITH_PID
-    airflow.settings.configure_logging(log_format=log_format)
+    configure_logging()
 
     with app.app_context():
         from airflow.www import views

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 46ed681..0c94137 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -2500,54 +2500,6 @@ class EmailSmtpTest(unittest.TestCase):
         self.assertFalse(mock_smtp.called)
         self.assertFalse(mock_smtp_ssl.called)
 
-class LogTest(unittest.TestCase):
-    def setUp(self):
-        configuration.load_test_config()
-
-    def _log(self):
-        settings.configure_logging()
-
-        sio = six.StringIO()
-        handler = logging.StreamHandler(sio)
-        logger = logging.getLogger()
-        logger.addHandler(handler)
-
-        logging.debug("debug")
-        logging.info("info")
-        logging.warn("warn")
-
-        sio.flush()
-        return sio.getvalue()
-
-    def test_default_log_level(self):
-        s = self._log()
-        self.assertFalse("debug" in s)
-        self.assertTrue("info" in s)
-        self.assertTrue("warn" in s)
-
-    def test_change_log_level_to_debug(self):
-        configuration.set("core", "LOGGING_LEVEL", "DEBUG")
-        s = self._log()
-        self.assertTrue("debug" in s)
-        self.assertTrue("info" in s)
-        self.assertTrue("warn" in s)
-
-    def test_change_log_level_to_info(self):
-        configuration.set("core", "LOGGING_LEVEL", "INFO")
-        s = self._log()
-        self.assertFalse("debug" in s)
-        self.assertTrue("info" in s)
-        self.assertTrue("warn" in s)
-
-    def test_change_log_level_to_warn(self):
-        configuration.set("core", "LOGGING_LEVEL", "WARNING")
-        s = self._log()
-        self.assertFalse("debug" in s)
-        self.assertFalse("info" in s)
-        self.assertTrue("warn" in s)
-
-    def tearDown(self):
-        configuration.set("core", "LOGGING_LEVEL", "INFO")
 
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/tests/test_logging_config.py
----------------------------------------------------------------------
diff --git a/tests/test_logging_config.py b/tests/test_logging_config.py
new file mode 100644
index 0000000..9407221
--- /dev/null
+++ b/tests/test_logging_config.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import shutil
+import sys
+import tempfile
+import unittest
+from mock import patch, mock
+
+from airflow import configuration as conf
+from airflow.configuration import mkdir_p
+from airflow.exceptions import AirflowConfigException
+
+SETTINGS_FILE_VALID = """
+LOGGING_CONFIG = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow.task': {
+            'format': '[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s'
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'logging.StreamHandler',
+            'formatter': 'airflow.task',
+            'stream': 'ext://sys.stdout'
+        }
+    },
+    'loggers': {
+        'airflow': {
+            'handlers': ['console'],
+            'level': 'INFO',
+            'propagate': False
+        }
+    }
+}
+"""
+
+SETTINGS_FILE_INVALID = """
+LOGGING_CONFIG = {
+    'version': 1,
+    'disable_existing_loggers': False,
+    'formatters': {
+        'airflow.task': {
+            'format': '[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s'
+        },
+    },
+    'handlers': {
+        'console': {
+            'class': 'logging.StreamHandler',
+            'formatter': 'airflow.task',
+            'stream': 'ext://sys.stdout'
+        }
+    },
+    'loggers': {
+        'airflow': {
+            'handlers': ['file.handler'], # this handler does not exists
+            'level': 'INFO',
+            'propagate': False
+        }
+    }
+}
+"""
+
+SETTINGS_FILE_EMPTY = """
+# Other settings here
+"""
+
+SETTINGS_DEFAULT_NAME = 'custom_airflow_local_settings'
+
+
+class settings_context(object):
+    """
+    Sets a settings file and puts it in the Python classpath
+
+    :param content:
+          The content of the settings file
+    """
+
+    def __init__(self, content, dir=None, name='LOGGING_CONFIG'):
+        self.content = content
+        self.settings_root = tempfile.mkdtemp()
+        filename = "{}.py".format(SETTINGS_DEFAULT_NAME)
+
+        if dir:
+            # Replace slashes by dots
+            self.module = dir.replace('/', '.') + '.' + SETTINGS_DEFAULT_NAME + '.' + name
+
+            # Create the directory structure
+            dir_path = os.path.join(self.settings_root, dir)
+            mkdir_p(dir_path)
+
+            # Add the __init__ for the directories
+            # This is required for Python 2.7
+            basedir = self.settings_root
+            for part in dir.split('/'):
+                open(os.path.join(basedir, '__init__.py'), 'w').close()
+                basedir = os.path.join(basedir, part)
+            open(os.path.join(basedir, '__init__.py'), 'w').close()
+
+            self.settings_file = os.path.join(dir_path, filename)
+        else:
+            self.module = SETTINGS_DEFAULT_NAME + '.' + name
+            self.settings_file = os.path.join(self.settings_root, filename)
+
+    def __enter__(self):
+        with open(self.settings_file, 'w') as handle:
+            handle.writelines(self.content)
+        sys.path.append(self.settings_root)
+        conf.set(
+            'core',
+            'logging_config_class',
+            self.module
+        )
+        return self.settings_file
+
+    def __exit__(self, *exc_info):
+        #shutil.rmtree(self.settings_root)
+        # Reset config
+        conf.set('core', 'logging_config_class', '')
+        sys.path.remove(self.settings_root)
+
+
+class TestLoggingSettings(unittest.TestCase):
+    # Make sure that the configure_logging is not cached
+    def setUp(self):
+        self.old_modules = dict(sys.modules)
+
+    def tearDown(self):
+        # Remove any new modules imported during the test run. This lets us
+        # import the same source files for more than one test.
+        for m in [m for m in sys.modules if m not in self.old_modules]:
+            del sys.modules[m]
+
+    # When we try to load an invalid config file, we expect an error
+    def test_loading_invalid_local_settings(self):
+        from airflow.logging_config import configure_logging, log
+        with settings_context(SETTINGS_FILE_INVALID):
+            with patch.object(log, 'warning') as mock_info:
+                # Load config
+                with self.assertRaises(ValueError):
+                    configure_logging()
+
+                mock_info.assert_called_with(
+                    'Unable to load the config, contains a configuration error.'
+                )
+
+    def test_loading_valid_complex_local_settings(self):
+        # Test what happens when the config is somewhere in a subfolder
+        module_structure = 'etc.airflow.config'
+        dir_structure = module_structure.replace('.', '/')
+        with settings_context(SETTINGS_FILE_VALID, dir_structure):
+            from airflow.logging_config import configure_logging, log
+            with patch.object(log, 'info') as mock_info:
+                configure_logging()
+                mock_info.assert_called_with(
+                    'Successfully imported user-defined logging config from %s',
+                    'etc.airflow.config.{}.LOGGING_CONFIG'.format(
+                        SETTINGS_DEFAULT_NAME
+                    )
+                )
+
+    # When we try to load a valid config
+    def test_loading_valid_local_settings(self):
+        with settings_context(SETTINGS_FILE_VALID):
+            from airflow.logging_config import configure_logging, log
+            with patch.object(log, 'info') as mock_info:
+                configure_logging()
+                mock_info.assert_called_with(
+                    'Successfully imported user-defined logging config from %s',
+                    '{}.LOGGING_CONFIG'.format(
+                        SETTINGS_DEFAULT_NAME
+                    )
+                )
+
+    # When we load an empty file, it should go to default
+    def test_loading_no_local_settings(self):
+        with settings_context(SETTINGS_FILE_EMPTY):
+            from airflow.logging_config import configure_logging
+            with self.assertRaises(ImportError):
+                configure_logging()
+
+    # When the key is not available in the configuration
+    def test_when_the_config_key_does_not_exists(self):
+        from airflow import logging_config
+
+        logging_config.conf.get = mock.Mock(side_effect=AirflowConfigException('boom'))
+
+        with patch.object(logging_config.log, 'debug') as mock_debug:
+            logging_config.configure_logging()
+            mock_debug.assert_any_call('Could not find key logging_config_class in config')
+
+    # Just default
+    def test_loading_local_settings_without_logging_config(self):
+        from airflow.logging_config import configure_logging, log
+        with patch.object(log, 'debug') as mock_info:
+            configure_logging()
+            mock_info.assert_called_with(
+                'Unable to load custom logging, using default config instead'
+            )
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/tests/utils/test_log_handlers.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 8337c5d..25faa7c 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -21,7 +21,7 @@ import unittest
 
 from datetime import datetime
 from airflow.models import TaskInstance, DAG
-from airflow.config_templates.default_airflow_logging import DEFAULT_LOGGING_CONFIG
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils.log.file_task_handler import FileTaskHandler
 
@@ -73,24 +73,24 @@ class TestFileTaskLogHandler(unittest.TestCase):
         # Remove the generated tmp log file.
         os.remove(log_filename)
 
-    
+
 class TestFilenameRendering(unittest.TestCase):
-    
+
     def setUp(self):
         dag = DAG('dag_for_testing_filename_rendering', start_date=DEFAULT_DATE)
         task = DummyOperator(task_id='task_for_testing_filename_rendering', dag=dag)
         self.ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
-    
+
     def test_python_formatting(self):
         expected_filename = 'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/%s/42.log' % DEFAULT_DATE.isoformat()
-        
+
         fth = FileTaskHandler('', '{dag_id}/{task_id}/{execution_date}/{try_number}.log')
         rendered_filename = fth._render_filename(self.ti, 42)
         self.assertEqual(expected_filename, rendered_filename)
-        
+
     def test_jinja_rendering(self):
         expected_filename = 'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/%s/42.log' % DEFAULT_DATE.isoformat()
-        
+
         fth = FileTaskHandler('', '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log')
         rendered_filename = fth._render_filename(self.ti, 42)
-        self.assertEqual(expected_filename, rendered_filename)
\ No newline at end of file
+        self.assertEqual(expected_filename, rendered_filename)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c3a65a3/tests/www/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 83bdc44..553e7cc 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -12,18 +12,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import unittest
+import copy
 import logging.config
 import os
-import copy
+import shutil
+import tempfile
+import unittest
 from datetime import datetime
+import sys
 
 from airflow import models, configuration, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
 from airflow.models import DAG, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.settings import Session
 from airflow.www import app as application
-from airflow.operators.dummy_operator import DummyOperator
-from airflow.config_templates.default_airflow_logging import DEFAULT_LOGGING_CONFIG
+from airflow import configuration as conf
 
 
 class TestChartModelView(unittest.TestCase):
@@ -295,7 +299,6 @@ class TestPoolModelView(unittest.TestCase):
 
 
 class TestLogView(unittest.TestCase):
-
     DAG_ID = 'dag_for_testing_log_view'
     TASK_ID = 'task_for_testing_log_view'
     DEFAULT_DATE = datetime(2017, 9, 1)
@@ -319,12 +322,21 @@ class TestLogView(unittest.TestCase):
     def setUp(self):
         super(TestLogView, self).setUp()
 
+        # Create a custom logging configuration
         configuration.load_test_config()
         logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
         current_dir = os.path.dirname(os.path.abspath(__file__))
         logging_config['handlers']['file.task']['base_log_folder'] = os.path.normpath(
             os.path.join(current_dir, 'test_logs'))
-        logging.config.dictConfig(logging_config)
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+        conf.set('core', 'logging_config_class', 'airflow_local_settings.LOGGING_CONFIG')
 
         app = application.create_app(testing=True)
         self.app = app.test_client()
@@ -347,6 +359,11 @@ class TestLogView(unittest.TestCase):
             TaskInstance.execution_date == self.DEFAULT_DATE).delete()
         self.session.commit()
         self.session.close()
+
+        sys.path.remove(self.settings_folder)
+        shutil.rmtree(self.settings_folder)
+        conf.set('core', 'logging_config_class', '')
+
         super(TestLogView, self).tearDown()
 
     def test_get_file_task_log(self):