You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/19 08:17:30 UTC

[1/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to log

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8e253c750 -> eb2f58909


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/logging_mixin.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
new file mode 100644
index 0000000..a3aad5b
--- /dev/null
+++ b/airflow/utils/log/logging_mixin.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import logging
+import warnings
+from builtins import object
+
+
+class LoggingMixin(object):
+    """
+    Convenience super-class to have a logger configured with the class name
+    """
+
+    # We want to deprecate the logger property in Airflow 2.0
+    # The log property is the de facto standard in most programming languages
+    @property
+    def logger(self):
+        warnings.warn(
+            'Initializing logger for {} using logger(), which will '
+            'be replaced by .log in Airflow 2.0'.format(
+                self.__class__.__module__ + '.' + self.__class__.__name__
+            ),
+            DeprecationWarning
+        )
+        return self.log
+
+    @property
+    def log(self):
+        try:
+            return self._log
+        except AttributeError:
+            self._log = logging.root.getChild(
+                self.__class__.__module__ + '.' + self.__class__.__name__
+            )
+            return self._log
+
+    def set_log_contexts(self, task_instance):
+        """
+        Set the context for all handlers of current logger.
+        """
+        for handler in self.log.handlers:
+            try:
+                handler.set_context(task_instance)
+            except AttributeError:
+                pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py
index 71fc149..2ed97a1 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -14,7 +14,7 @@
 import os
 
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.log.file_task_handler import FileTaskHandler
 
 
@@ -36,7 +36,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
             from airflow.hooks.S3_hook import S3Hook
             return S3Hook(remote_conn_id)
         except:
-            self.logger.error(
+            self.log.error(
                 'Could not create an S3Hook with connection id "%s". '
                 'Please make sure that airflow[s3] is installed and '
                 'the S3 connection exists.', remote_conn_id
@@ -132,7 +132,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
             # return error if needed
             if return_error:
                 msg = 'Could not read logs from {}'.format(remote_log_location)
-                self.logger.error(msg)
+                self.log.error(msg)
                 return msg
 
     def s3_write(self, log, remote_log_location, append=True):
@@ -159,4 +159,4 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
                 encrypt=configuration.getboolean('core', 'ENCRYPT_S3_LOGS'),
             )
         except:
-            self.logger.error('Could not write logs to %s', remote_log_location)
+            self.log.error('Could not write logs to %s', remote_log_location)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/timeout.py
----------------------------------------------------------------------
diff --git a/airflow/utils/timeout.py b/airflow/utils/timeout.py
index 53f2149..e0b3f96 100644
--- a/airflow/utils/timeout.py
+++ b/airflow/utils/timeout.py
@@ -20,7 +20,7 @@ from __future__ import unicode_literals
 import signal
 
 from airflow.exceptions import AirflowTaskTimeout
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class timeout(LoggingMixin):
@@ -33,7 +33,7 @@ class timeout(LoggingMixin):
         self.error_message = error_message
 
     def handle_timeout(self, signum, frame):
-        self.logger.error("Process timed out")
+        self.log.error("Process timed out")
         raise AirflowTaskTimeout(self.error_message)
 
     def __enter__(self):
@@ -41,12 +41,12 @@ class timeout(LoggingMixin):
             signal.signal(signal.SIGALRM, self.handle_timeout)
             signal.alarm(self.seconds)
         except ValueError as e:
-            self.logger.warning("timeout can't be used in the current context")
-            self.logger.exception(e)
+            self.log.warning("timeout can't be used in the current context")
+            self.log.exception(e)
 
     def __exit__(self, type, value, traceback):
         try:
             signal.alarm(0)
         except ValueError as e:
-            self.logger.warning("timeout can't be used in the current context")
-            self.logger.exception(e)
+            self.log.warning("timeout can't be used in the current context")
+            self.log.exception(e)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index 4e5892d..b5a3052 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -18,7 +18,7 @@ from airflow.api.common.experimental import trigger_dag as trigger
 from airflow.api.common.experimental.get_task import get_task
 from airflow.api.common.experimental.get_task_instance import get_task_instance
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.www.app import csrf
 
 from flask import (
@@ -27,7 +27,7 @@ from flask import (
 )
 from datetime import datetime
 
-_log = LoggingMixin().logger
+_log = LoggingMixin().log
 
 requires_authentication = airflow.api.api_auth.requires_authentication
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/www/app.py
----------------------------------------------------------------------
diff --git a/airflow/www/app.py b/airflow/www/app.py
index f280713..438a1e2 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -113,7 +113,7 @@ def create_app(config=None, testing=False):
 
         def integrate_plugins():
             """Integrate plugins to the context"""
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             from airflow.plugins_manager import (
                 admin_views, flask_blueprints, menu_links)
             for v in admin_views:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/scripts/perf/scheduler_ops_metrics.py
----------------------------------------------------------------------
diff --git a/scripts/perf/scheduler_ops_metrics.py b/scripts/perf/scheduler_ops_metrics.py
index 40e1b36..34b5a83 100644
--- a/scripts/perf/scheduler_ops_metrics.py
+++ b/scripts/perf/scheduler_ops_metrics.py
@@ -119,9 +119,9 @@ class SchedulerMetricsJob(SchedulerJob):
                 (datetime.now()-self.start_date).total_seconds() >
                 MAX_RUNTIME_SECS):
             if (len(successful_tis) == num_task_instances):
-                self.logger.info("All tasks processed! Printing stats.")
+                self.log.info("All tasks processed! Printing stats.")
             else:
-                self.logger.info("Test timeout reached. "
+                self.log.info("Test timeout reached. "
                                  "Printing available stats.")
             self.print_stats()
             set_dags_paused_state(True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/contrib/hooks/test_databricks_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_databricks_hook.py b/tests/contrib/hooks/test_databricks_hook.py
index e091067..3931bd3 100644
--- a/tests/contrib/hooks/test_databricks_hook.py
+++ b/tests/contrib/hooks/test_databricks_hook.py
@@ -111,7 +111,7 @@ class DatabricksHookTest(unittest.TestCase):
     @mock.patch('airflow.contrib.hooks.databricks_hook.requests')
     def test_do_api_call_with_error_retry(self, mock_requests):
         for exception in [requests_exceptions.ConnectionError, requests_exceptions.Timeout]:
-            with mock.patch.object(self.hook.logger, 'error') as mock_errors:
+            with mock.patch.object(self.hook.log, 'error') as mock_errors:
                 mock_requests.reset_mock()
                 mock_requests.post.side_effect = exception()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/contrib/operators/test_dataproc_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py
index 89ad258..7ce6199 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -132,7 +132,7 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                 zone=ZONE,
                 dag=self.dag
             )
-            with patch.object(dataproc_task.logger, 'info') as mock_info:
+            with patch.object(dataproc_task.log, 'info') as mock_info:
                 with self.assertRaises(TypeError) as _:
                     dataproc_task.execute(None)
                 mock_info.assert_called_with('Creating cluster: %s', CLUSTER_NAME)
@@ -148,7 +148,7 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
                 zone=ZONE,
                 dag=self.dag
             )
-            with patch.object(dataproc_task.logger, 'info') as mock_info:
+            with patch.object(dataproc_task.log, 'info') as mock_info:
                 context = { 'ts_nodash' : 'testnodash'}
 
                 rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'), context)
@@ -190,7 +190,7 @@ class DataprocClusterDeleteOperatorTest(unittest.TestCase):
                 project_id=PROJECT_ID,
                 dag=self.dag
             )
-            with patch.object(dataproc_task.logger, 'info') as mock_info:
+            with patch.object(dataproc_task.log, 'info') as mock_info:
                 with self.assertRaises(TypeError) as _:
                     dataproc_task.execute(None)
                 mock_info.assert_called_with('Deleting cluster: %s', CLUSTER_NAME)
@@ -205,7 +205,7 @@ class DataprocClusterDeleteOperatorTest(unittest.TestCase):
                 dag=self.dag
             )
 
-            with patch.object(dataproc_task.logger, 'info') as mock_info:
+            with patch.object(dataproc_task.log, 'info') as mock_info:
                 context = { 'ts_nodash' : 'testnodash'}
 
                 rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'), context)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/contrib/sensors/test_hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_hdfs_sensors.py b/tests/contrib/sensors/test_hdfs_sensors.py
index 0e2ed0c..290089b 100644
--- a/tests/contrib/sensors/test_hdfs_sensors.py
+++ b/tests/contrib/sensors/test_hdfs_sensors.py
@@ -26,8 +26,8 @@ class HdfsSensorFolderTests(unittest.TestCase):
             raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here')
         from tests.core import FakeHDFSHook
         self.hook = FakeHDFSHook
-        self.logger = logging.getLogger()
-        self.logger.setLevel(logging.DEBUG)
+        self.log = logging.getLogger()
+        self.log.setLevel(logging.DEBUG)
 
     def test_should_be_empty_directory(self):
         """
@@ -35,9 +35,9 @@ class HdfsSensorFolderTests(unittest.TestCase):
         :return:
         """
         # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
         task = HdfsSensorFolder(task_id='Should_be_empty_directory',
                                 filepath='/datadirectory/empty_directory',
                                 be_empty=True,
@@ -58,9 +58,9 @@ class HdfsSensorFolderTests(unittest.TestCase):
         :return:
         """
         # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
         task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
                                 filepath='/datadirectory/not_empty_directory',
                                 be_empty=True,
@@ -80,9 +80,9 @@ class HdfsSensorFolderTests(unittest.TestCase):
         :return:
         """
         # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
         task = HdfsSensorFolder(task_id='Should_be_non_empty_directory',
                                 filepath='/datadirectory/not_empty_directory',
                                 timeout=1,
@@ -102,9 +102,9 @@ class HdfsSensorFolderTests(unittest.TestCase):
         :return:
         """
         # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
         task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
                                 filepath='/datadirectory/empty_directory',
                                 timeout=1,
@@ -124,8 +124,8 @@ class HdfsSensorRegexTests(unittest.TestCase):
             raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here')
         from tests.core import FakeHDFSHook
         self.hook = FakeHDFSHook
-        self.logger = logging.getLogger()
-        self.logger.setLevel(logging.DEBUG)
+        self.log = logging.getLogger()
+        self.log.setLevel(logging.DEBUG)
 
     def test_should_match_regex(self):
         """
@@ -133,9 +133,9 @@ class HdfsSensorRegexTests(unittest.TestCase):
         :return:
         """
         # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
         compiled_regex = re.compile("test[1-2]file")
         task = HdfsSensorRegex(task_id='Should_match_the_regex',
                                filepath='/datadirectory/regex_dir',
@@ -157,9 +157,9 @@ class HdfsSensorRegexTests(unittest.TestCase):
         :return:
         """
         # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
         compiled_regex = re.compile("^IDoNotExist")
         task = HdfsSensorRegex(task_id='Should_not_match_the_regex',
                                filepath='/datadirectory/regex_dir',
@@ -180,9 +180,9 @@ class HdfsSensorRegexTests(unittest.TestCase):
         :return:
         """
         # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
         compiled_regex = re.compile("test[1-2]file")
         task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize',
                                filepath='/datadirectory/regex_dir',
@@ -207,9 +207,9 @@ class HdfsSensorRegexTests(unittest.TestCase):
         :return:
         """
         # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
         compiled_regex = re.compile("test[1-2]file")
         task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
                                filepath='/datadirectory/regex_dir',
@@ -231,9 +231,9 @@ class HdfsSensorRegexTests(unittest.TestCase):
         :return:
         """
         # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
+        self.log.debug('#' * 10)
+        self.log.debug('Running %s', self._testMethodName)
+        self.log.debug('#' * 10)
         compiled_regex = re.compile("copying_file_\d+.txt")
         task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
                                filepath='/datadirectory/regex_dir',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/executors/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py
index 9ec6cd4..a0e227c 100644
--- a/tests/executors/test_executor.py
+++ b/tests/executors/test_executor.py
@@ -29,8 +29,8 @@ class TestExecutor(BaseExecutor):
         super(TestExecutor, self).__init__(*args, **kwargs)
 
     def execute_async(self, key, command, queue=None):
-        self.logger.debug("{} running task instances".format(len(self.running)))
-        self.logger.debug("{} in queue".format(len(self.queued_tasks)))
+        self.log.debug("{} running task instances".format(len(self.running)))
+        self.log.debug("{} in queue".format(len(self.queued_tasks)))
 
     def heartbeat(self):
         session = settings.Session()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index 9b256e6..ee67524 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -75,7 +75,7 @@ class TimeoutTestSensor(BaseSensorOperator):
                 else:
                     raise AirflowSensorTimeout('Snap. Time is OUT.')
             time.sleep(self.poke_interval)
-        self.logger.info("Success criteria met. Exiting.")
+        self.log.info("Success criteria met. Exiting.")
 
 
 class SensorTimeoutTest(unittest.TestCase):
@@ -187,7 +187,7 @@ class HttpSensorTests(unittest.TestCase):
             poke_interval=1
         )
 
-        with mock.patch.object(task.hook.logger, 'error') as mock_errors:
+        with mock.patch.object(task.hook.log, 'error') as mock_errors:
             with self.assertRaises(AirflowSensorTimeout):
                 task.execute(None)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/test_utils/reset_warning_registry.py
----------------------------------------------------------------------
diff --git a/tests/test_utils/reset_warning_registry.py b/tests/test_utils/reset_warning_registry.py
new file mode 100644
index 0000000..a275a6d
--- /dev/null
+++ b/tests/test_utils/reset_warning_registry.py
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import re
+import sys
+
+
+# We need to explicitly clear the warning registry context
+# https://docs.python.org/2/library/warnings.html
+# One thing to be aware of is that if a warning has already been raised because
+# of a once/default rule, then no matter what filters are set the warning will
+# not be seen again unless the warnings registry related to the warning has
+# been cleared.
+#
+# Proposed fix from Stack overflow, which refers to the Python bug-page
+# noqa
+# https://stackoverflow.com/questions/19428761/python-showing-once-warnings-again-resetting-all-warning-registries
+class reset_warning_registry(object):
+    """
+    context manager which archives & clears warning registry for duration of
+    context.
+
+    :param pattern:
+          optional regex pattern, causes manager to only reset modules whose
+          names match this pattern. defaults to ``".*"``.
+    """
+
+    #: regexp for filtering which modules are reset
+    _pattern = None
+
+    #: dict mapping module name -> old registry contents
+    _backup = None
+
+    def __init__(self, pattern=None):
+        self._pattern = re.compile(pattern or ".*")
+
+    def __enter__(self):
+        # archive and clear the __warningregistry__ key for all modules
+        # that match the 'reset' pattern.
+        pattern = self._pattern
+        backup = self._backup = {}
+        for name, mod in list(sys.modules.items()):
+            if pattern.match(name):
+                reg = getattr(mod, "__warningregistry__", None)
+                if reg:
+                    backup[name] = reg.copy()
+                    reg.clear()
+        return self
+
+    def __exit__(self, *exc_info):
+        # restore warning registry from backup
+        modules = sys.modules
+        backup = self._backup
+        for name, content in backup.items():
+            mod = modules.get(name)
+            if mod is None:
+                continue
+            reg = getattr(mod, "__warningregistry__", None)
+            if reg is None:
+                setattr(mod, "__warningregistry__", content)
+            else:
+                reg.clear()
+                reg.update(content)
+
+        # clear all registry entries that we didn't archive
+        pattern = self._pattern
+        for name, mod in list(modules.items()):
+            if pattern.match(name) and name not in backup:
+                reg = getattr(mod, "__warningregistry__", None)
+                if reg:
+                    reg.clear()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/utils/log/test_logging.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_logging.py b/tests/utils/log/test_logging.py
index 7e05c7d..8df6dfc 100644
--- a/tests/utils/log/test_logging.py
+++ b/tests/utils/log/test_logging.py
@@ -41,7 +41,7 @@ class TestS3TaskHandler(unittest.TestCase):
     def test_init_raises(self):
         self.hook_mock.side_effect = Exception('Failed to connect')
         handler = S3TaskHandler()
-        with mock.patch.object(handler.logger, 'error') as mock_error:
+        with mock.patch.object(handler.log, 'error') as mock_error:
             # Initialize the hook
             handler.hook()
             mock_error.assert_called_once_with(
@@ -81,7 +81,7 @@ class TestS3TaskHandler(unittest.TestCase):
     def test_read_raises_return_error(self):
         self.hook_inst_mock.get_key.side_effect = Exception('error')
         handler = S3TaskHandler()
-        with mock.patch.object(handler.logger, 'error') as mock_error:
+        with mock.patch.object(handler.log, 'error') as mock_error:
             result = handler.s3_log_read(
                 self.remote_log_location,
                 return_error=True
@@ -102,7 +102,7 @@ class TestS3TaskHandler(unittest.TestCase):
     def test_write_raises(self):
         self.hook_inst_mock.load_string.side_effect = Exception('error')
         handler = S3TaskHandler()
-        with mock.patch.object(handler.logger, 'error') as mock_error:
+        with mock.patch.object(handler.log, 'error') as mock_error:
             handler.write('text', self.remote_log_location)
             msg = 'Could not write logs to %s' % self.remote_log_location
             mock_error.assert_called_once_with(msg)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/tests/utils/test_logging_mixin.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_logging_mixin.py b/tests/utils/test_logging_mixin.py
new file mode 100644
index 0000000..bf9e225
--- /dev/null
+++ b/tests/utils/test_logging_mixin.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import warnings
+
+from airflow.operators.bash_operator import BashOperator
+from tests.test_utils.reset_warning_registry import reset_warning_registry
+
+
+class TestLoggingMixin(unittest.TestCase):
+    def setUp(self):
+        warnings.filterwarnings(
+            action='always'
+        )
+
+    def test_log(self):
+        op = BashOperator(
+            task_id='task-1',
+            bash_command='exit 0'
+        )
+        with reset_warning_registry():
+            with warnings.catch_warnings(record=True) as w:
+                # Set to always, because the warning may have been thrown before
+                # Trigger the warning
+                op.logger.info('Some arbitrary line')
+
+                self.assertEqual(len(w), 1)
+
+                warning = w[0]
+                self.assertTrue(issubclass(warning.category, DeprecationWarning))
+                self.assertEqual(
+                    'Initializing logger for airflow.operators.bash_operator.BashOperator'
+                    ' using logger(), which will be replaced by .log in Airflow 2.0',
+                    str(warning.message)
+                )
+
+    def tearDown(self):
+        warnings.resetwarnings()


[3/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to log

Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/sftp_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sftp_operator.py b/airflow/contrib/operators/sftp_operator.py
index 5abfc51..44ea66d 100644
--- a/airflow/contrib/operators/sftp_operator.py
+++ b/airflow/contrib/operators/sftp_operator.py
@@ -81,12 +81,12 @@ class SFTPOperator(BaseOperator):
             if self.operation.lower() == SFTPOperation.GET:
                 file_msg = "from {0} to {1}".format(self.remote_filepath,
                                                     self.local_filepath)
-                self.logger.debug("Starting to transfer %s", file_msg)
+                self.log.debug("Starting to transfer %s", file_msg)
                 sftp_client.get(self.remote_filepath, self.local_filepath)
             else:
                 file_msg = "from {0} to {1}".format(self.local_filepath,
                                                     self.remote_filepath)
-                self.logger.debug("Starting to transfer file %s", file_msg)
+                self.log.debug("Starting to transfer file %s", file_msg)
                 sftp_client.put(self.local_filepath, self.remote_filepath)
 
         except Exception as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/vertica_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py
index fc9cf3b..7a319f2 100644
--- a/airflow/contrib/operators/vertica_operator.py
+++ b/airflow/contrib/operators/vertica_operator.py
@@ -39,6 +39,6 @@ class VerticaOperator(BaseOperator):
         self.sql = sql
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         hook = VerticaHook(vertica_conn_id=self.vertica_conn_id)
         hook.run(self.sql)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/vertica_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py
index 35ff3c6..5e769fc 100644
--- a/airflow/contrib/operators/vertica_to_hive.py
+++ b/airflow/contrib/operators/vertica_to_hive.py
@@ -103,7 +103,7 @@ class VerticaToHiveTransfer(BaseOperator):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
 
-        self.logger.info("Dumping Vertica query results to local file")
+        self.log.info("Dumping Vertica query results to local file")
         conn = vertica.get_conn()
         cursor = conn.cursor()
         cursor.execute(self.sql)
@@ -119,7 +119,7 @@ class VerticaToHiveTransfer(BaseOperator):
             f.flush()
             cursor.close()
             conn.close()
-            self.logger.info("Loading file into Hive")
+            self.log.info("Loading file into Hive")
             hive.load_file(
                 f.name,
                 self.hive_table,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
index 630cebe..90a3264 100644
--- a/airflow/contrib/sensors/bigquery_sensor.py
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -59,7 +59,7 @@ class BigQueryTableSensor(BaseSensorOperator):
 
     def poke(self, context):
         table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
-        self.logger.info('Sensor checks existence of table: %s', table_uri)
+        self.log.info('Sensor checks existence of table: %s', table_uri)
         hook = BigQueryHook(
             bigquery_conn_id=self.bigquery_conn_id,
             delegate_to=self.delegate_to)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
index 4ee45f9..e1a9169 100644
--- a/airflow/contrib/sensors/datadog_sensor.py
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -67,7 +67,7 @@ class DatadogSensor(BaseSensorOperator):
             tags=self.tags)
 
         if isinstance(response, dict) and response.get('status', 'ok') != 'ok':
-            self.logger.error("Unexpected Datadog result: %s", response)
+            self.log.error("Unexpected Datadog result: %s", response)
             raise AirflowException("Datadog returned unexpected result")
 
         if self.response_check:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index 034fcb6..3ecaa42 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -36,11 +36,11 @@ class EmrBaseSensor(BaseSensorOperator):
         response = self.get_emr_response()
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
-            self.logger.info('Bad HTTP response: %s', response)
+            self.log.info('Bad HTTP response: %s', response)
             return False
 
         state = self.state_from_response(response)
-        self.logger.info('Job flow currently %s', state)
+        self.log.info('Job flow currently %s', state)
 
         if state in self.NON_TERMINAL_STATES:
             return False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index e5610a1..87b65c8 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -41,7 +41,7 @@ class EmrJobFlowSensor(EmrBaseSensor):
     def get_emr_response(self):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        self.logger.info('Poking cluster %s', self.job_flow_id)
+        self.log.info('Poking cluster %s', self.job_flow_id)
         return emr.describe_cluster(ClusterId=self.job_flow_id)
 
     def state_from_response(self, response):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index e131d77..003d2d1 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -45,7 +45,7 @@ class EmrStepSensor(EmrBaseSensor):
     def get_emr_response(self):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        self.logger.info('Poking step %s on cluster %s', self.step_id, self.job_flow_id)
+        self.log.info('Poking step %s on cluster %s', self.step_id, self.job_flow_id)
         return emr.describe_step(ClusterId=self.job_flow_id, StepId=self.step_id)
 
     def state_from_response(self, response):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
index 2e604e9..bd66c32 100644
--- a/airflow/contrib/sensors/ftp_sensor.py
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -42,7 +42,7 @@ class FTPSensor(BaseSensorOperator):
 
     def poke(self, context):
         with self._create_hook() as hook:
-            self.logger.info('Poking for %s', self.path)
+            self.log.info('Poking for %s', self.path)
             try:
                 hook.get_mod_time(self.path)
             except ftplib.error_perm as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index 800c1bd..384e26f 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -54,7 +54,7 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
         self.delegate_to = delegate_to
 
     def poke(self, context):
-        self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+        self.log.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
         hook = GoogleCloudStorageHook(
             google_cloud_storage_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to)
@@ -116,7 +116,7 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
         self.delegate_to = delegate_to
 
     def poke(self, context):
-        self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+        self.log.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
         hook = GoogleCloudStorageHook(
             google_cloud_storage_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py
index 11e8b07..1893f01 100644
--- a/airflow/contrib/sensors/hdfs_sensors.py
+++ b/airflow/contrib/sensors/hdfs_sensors.py
@@ -28,7 +28,7 @@ class HdfsSensorRegex(HdfsSensor):
         :return: Bool depending on the search criteria
         """
         sb = self.hook(self.hdfs_conn_id).get_conn()
-        self.logger.info(
+        self.log.info(
             'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals())
         )
         result = [f for f in sb.ls([self.filepath], include_toplevel=False) if
@@ -56,10 +56,10 @@ class HdfsSensorFolder(HdfsSensor):
         result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
         result = self.filter_for_filesize(result, self.file_size)
         if self.be_empty:
-            self.logger.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
+            self.log.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
             return len(result) == 1 and result[0]['path'] == self.filepath
         else:
-            self.logger.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
+            self.log.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
             result.pop(0)
             return bool(result) and result[0]['file_type'] == 'f'
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/jira_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py
index 4cbc676..1dc7b50 100644
--- a/airflow/contrib/sensors/jira_sensor.py
+++ b/airflow/contrib/sensors/jira_sensor.py
@@ -97,7 +97,7 @@ class JiraTicketSensor(JiraSensor):
                                                *args, **kwargs)
 
     def poke(self, context):
-        self.logger.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
+        self.log.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
 
         self.jira_operator.method_name = "issue"
         self.jira_operator.jira_method_args = {
@@ -123,19 +123,19 @@ class JiraTicketSensor(JiraSensor):
                             and getattr(field_value, 'name'):
                         result = self.expected_value.lower() == field_value.name.lower()
                     else:
-                        self.logger.warning(
+                        self.log.warning(
                             "Not implemented checker for issue field %s which "
                             "is neither string nor list nor Jira Resource",
                             self.field
                         )
 
         except JIRAError as jira_error:
-            self.logger.error("Jira error while checking with expected value: %s", jira_error)
+            self.log.error("Jira error while checking with expected value: %s", jira_error)
         except Exception as e:
-            self.logger.error("Error while checking with expected value %s:", self.expected_value)
-            self.logger.exception(e)
+            self.log.error("Error while checking with expected value %s:", self.expected_value)
+            self.log.exception(e)
         if result is True:
-            self.logger.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value)
+            self.log.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value)
         else:
-            self.logger.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value)
+            self.log.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value)
         return result

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
index 220d766..6cc314b 100644
--- a/airflow/contrib/sensors/redis_key_sensor.py
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -38,5 +38,5 @@ class RedisKeySensor(BaseSensorOperator):
         self.key = key
 
     def poke(self, context):
-        self.logger.info('Sensor check existence of key: %s', self.key)
+        self.log.info('Sensor check existence of key: %s', self.key)
         return RedisHook(self.redis_conn_id).key_exists(self.key)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
index 1a54e12..4295a25 100644
--- a/airflow/contrib/sensors/wasb_sensor.py
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -47,7 +47,7 @@ class WasbBlobSensor(BaseSensorOperator):
         self.check_options = check_options
 
     def poke(self, context):
-        self.logger.info(
+        self.log.info(
             'Poking for blob: {self.blob_name}\n'
             'in wasb://{self.container_name}'.format(**locals())
         )
@@ -85,7 +85,7 @@ class WasbPrefixSensor(BaseSensorOperator):
         self.check_options = check_options
 
     def poke(self, context):
-        self.logger.info(
+        self.log.info(
             'Poking for prefix: {self.prefix}\n'
             'in wasb://{self.container_name}'.format(**locals())
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/task_runner/cgroup_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/task_runner/cgroup_task_runner.py b/airflow/contrib/task_runner/cgroup_task_runner.py
index 5d2518d..0022fd6 100644
--- a/airflow/contrib/task_runner/cgroup_task_runner.py
+++ b/airflow/contrib/task_runner/cgroup_task_runner.py
@@ -72,10 +72,10 @@ class CgroupTaskRunner(BaseTaskRunner):
         for path_element in path_split:
             name_to_node = {x.name: x for x in node.children}
             if path_element not in name_to_node:
-                self.logger.debug("Creating cgroup %s in %s", path_element, node.path)
+                self.log.debug("Creating cgroup %s in %s", path_element, node.path)
                 node = node.create_cgroup(path_element)
             else:
-                self.logger.debug(
+                self.log.debug(
                     "Not creating cgroup %s in %s since it already exists",
                     path_element, node.path
                 )
@@ -94,20 +94,20 @@ class CgroupTaskRunner(BaseTaskRunner):
         for path_element in path_split:
             name_to_node = {x.name: x for x in node.children}
             if path_element not in name_to_node:
-                self.logger.warning("Cgroup does not exist: %s", path)
+                self.log.warning("Cgroup does not exist: %s", path)
                 return
             else:
                 node = name_to_node[path_element]
         # node is now the leaf node
         parent = node.parent
-        self.logger.debug("Deleting cgroup %s/%s", parent, node.name)
+        self.log.debug("Deleting cgroup %s/%s", parent, node.name)
         parent.delete_cgroup(node.name)
 
     def start(self):
         # Use bash if it's already in a cgroup
         cgroups = self._get_cgroup_names()
         if cgroups["cpu"] != "/" or cgroups["memory"] != "/":
-            self.logger.debug(
+            self.log.debug(
                 "Already running in a cgroup (cpu: %s memory: %s) so not creating another one",
                 cgroups.get("cpu"), cgroups.get("memory")
             )
@@ -133,7 +133,7 @@ class CgroupTaskRunner(BaseTaskRunner):
         mem_cgroup_node = self._create_cgroup(self.mem_cgroup_name)
         self._created_mem_cgroup = True
         if self._mem_mb_limit > 0:
-            self.logger.debug(
+            self.log.debug(
                 "Setting %s with %s MB of memory",
                 self.mem_cgroup_name, self._mem_mb_limit
             )
@@ -143,14 +143,14 @@ class CgroupTaskRunner(BaseTaskRunner):
         cpu_cgroup_node = self._create_cgroup(self.cpu_cgroup_name)
         self._created_cpu_cgroup = True
         if self._cpu_shares > 0:
-            self.logger.debug(
+            self.log.debug(
                 "Setting %s with %s CPU shares",
                 self.cpu_cgroup_name, self._cpu_shares
             )
             cpu_cgroup_node.controller.shares = self._cpu_shares
 
         # Start the process w/ cgroups
-        self.logger.debug(
+        self.log.debug(
             "Starting task process with cgroups cpu,memory: %s",
             cgroup_name
         )
@@ -168,7 +168,7 @@ class CgroupTaskRunner(BaseTaskRunner):
         # I wasn't able to track down the root cause of the package install failures, but
         # we might want to revisit that approach at some other point.
         if return_code == 137:
-            self.logger.warning("Task failed with return code of 137. This may indicate "
+            self.log.warning("Task failed with return code of 137. This may indicate "
                               "that it was killed due to excessive memory usage. "
                               "Please consider optimizing your task or using the "
                               "resources argument to reserve more memory for your task")
@@ -176,7 +176,7 @@ class CgroupTaskRunner(BaseTaskRunner):
 
     def terminate(self):
         if self.process and psutil.pid_exists(self.process.pid):
-            kill_process_tree(self.logger, self.process.pid)
+            kill_process_tree(self.log, self.process.pid)
 
     def on_finish(self):
         # Let the OOM watcher thread know we're done to avoid false OOM alarms

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index 7812f96..c387eeb 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -19,7 +19,7 @@ from airflow.executors.local_executor import LocalExecutor
 from airflow.executors.sequential_executor import SequentialExecutor
 
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 DEFAULT_EXECUTOR = None
 
@@ -41,7 +41,7 @@ def GetDefaultExecutor():
 
     DEFAULT_EXECUTOR = _get_executor(executor_name)
 
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
     log.info("Using executor %s", executor_name)
 
     return DEFAULT_EXECUTOR

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 1197958..410a558 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -14,7 +14,7 @@
 from builtins import range
 
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 PARALLELISM = configuration.getint('core', 'PARALLELISM')
@@ -46,7 +46,7 @@ class BaseExecutor(LoggingMixin):
     def queue_command(self, task_instance, command, priority=1, queue=None):
         key = task_instance.key
         if key not in self.queued_tasks and key not in self.running:
-            self.logger.info("Adding to queue: %s", command)
+            self.log.info("Adding to queue: %s", command)
             self.queued_tasks[key] = (command, priority, queue, task_instance)
 
     def queue_task_instance(
@@ -99,9 +99,9 @@ class BaseExecutor(LoggingMixin):
         else:
             open_slots = self.parallelism - len(self.running)
 
-        self.logger.debug("%s running task instances", len(self.running))
-        self.logger.debug("%s in queue", len(self.queued_tasks))
-        self.logger.debug("%s open slots", open_slots)
+        self.log.debug("%s running task instances", len(self.running))
+        self.log.debug("%s in queue", len(self.queued_tasks))
+        self.log.debug("%s open slots", open_slots)
 
         sorted_queue = sorted(
             [(k, v) for k, v in self.queued_tasks.items()],
@@ -122,13 +122,13 @@ class BaseExecutor(LoggingMixin):
                 self.running[key] = command
                 self.execute_async(key, command=command, queue=queue)
             else:
-                self.logger.debug(
+                self.log.debug(
                     'Task is already running, not sending to executor: %s',
                     key
                 )
 
         # Calling child class sync method
-        self.logger.debug("Calling the %s sync method", self.__class__)
+        self.log.debug("Calling the %s sync method", self.__class__)
         self.sync()
 
     def change_state(self, key, state):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 39c895c..360a276 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -24,7 +24,7 @@ from celery import states as celery_states
 from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 PARALLELISM = configuration.get('core', 'PARALLELISM')
 
@@ -53,7 +53,7 @@ class CeleryConfig(object):
     try:
         celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
     except AirflowConfigException as e:
-        log = LoggingMixin().logger
+        log = LoggingMixin().log
         log.warning("Celery Executor will run without SSL")
 
     try:
@@ -76,7 +76,7 @@ app = Celery(
 
 @app.task
 def execute_command(command):
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
     log.info("Executing command in Celery: %s", command)
     try:
         subprocess.check_call(command, shell=True)
@@ -99,13 +99,13 @@ class CeleryExecutor(BaseExecutor):
         self.last_state = {}
 
     def execute_async(self, key, command, queue=DEFAULT_QUEUE):
-        self.logger.info("[celery] queuing {key} through celery, queue={queue}".format(**locals()))
+        self.log.info("[celery] queuing {key} through celery, queue={queue}".format(**locals()))
         self.tasks[key] = execute_command.apply_async(
             args=[command], queue=queue)
         self.last_state[key] = celery_states.PENDING
 
     def sync(self):
-        self.logger.debug("Inquiring about %s celery task(s)", len(self.tasks))
+        self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
         for key, async in list(self.tasks.items()):
             try:
                 state = async.state
@@ -123,11 +123,11 @@ class CeleryExecutor(BaseExecutor):
                         del self.tasks[key]
                         del self.last_state[key]
                     else:
-                        self.logger.info("Unexpected state: %s", async.state)
+                        self.log.info("Unexpected state: %s", async.state)
                     self.last_state[key] = async.state
             except Exception as e:
-                self.logger.error("Error syncing the celery executor, ignoring it:")
-                self.logger.exception(e)
+                self.log.error("Error syncing the celery executor, ignoring it:")
+                self.log.exception(e)
 
     def end(self, synchronous=False):
         if synchronous:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index 8a56506..07b8a82 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -53,10 +53,10 @@ class DaskExecutor(BaseExecutor):
         if future.done():
             key = self.futures[future]
             if future.exception():
-                self.logger.error("Failed to execute task: %s", repr(future.exception()))
+                self.log.error("Failed to execute task: %s", repr(future.exception()))
                 self.fail(key)
             elif future.cancelled():
-                self.logger.error("Failed to execute task")
+                self.log.error("Failed to execute task")
                 self.fail(key)
             else:
                 self.success(key)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 9730737..f9eceb3 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -20,7 +20,7 @@ from builtins import range
 
 from airflow import configuration
 from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 PARALLELISM = configuration.get('core', 'PARALLELISM')
@@ -40,14 +40,14 @@ class LocalWorker(multiprocessing.Process, LoggingMixin):
                 # Received poison pill, no more tasks to run
                 self.task_queue.task_done()
                 break
-            self.logger.info("%s running %s", self.__class__.__name__, command)
+            self.log.info("%s running %s", self.__class__.__name__, command)
             command = "exec bash -c '{0}'".format(command)
             try:
                 subprocess.check_call(command, shell=True)
                 state = State.SUCCESS
             except subprocess.CalledProcessError as e:
                 state = State.FAILED
-                self.logger.error("Failed to execute task %s.", str(e))
+                self.log.error("Failed to execute task %s.", str(e))
                 # TODO: Why is this commented out?
                 # raise e
             self.result_queue.put((key, state))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/sequential_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py
index 7d08a4b..a15450d 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -37,14 +37,14 @@ class SequentialExecutor(BaseExecutor):
 
     def sync(self):
         for key, command in self.commands_to_run:
-            self.logger.info("Executing command: %s", command)
+            self.log.info("Executing command: %s", command)
 
             try:
                 subprocess.check_call(command, shell=True)
                 self.change_state(key, State.SUCCESS)
             except subprocess.CalledProcessError as e:
                 self.change_state(key, State.FAILED)
-                self.logger.error("Failed to execute task %s.", str(e))
+                self.log.error("Failed to execute task %s.", str(e))
 
         self.commands_to_run = []
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/S3_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index 2f7e6ee..c405001 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -16,7 +16,7 @@ from __future__ import division
 
 from future import standard_library
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 standard_library.install_aliases()
 import re
@@ -87,7 +87,7 @@ def _parse_s3_config(config_file_name, config_format='boto', profile=None):
             if Config.has_option(cred_section, 'calling_format'):
                 calling_format = Config.get(cred_section, 'calling_format')
         except:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.warning("Option Error in parsing s3 config file")
             raise
         return (access_key, secret_key, calling_format)
@@ -378,7 +378,7 @@ class S3Hook(BaseHook):
                     offset = chunk * multipart_bytes
                     bytes = min(multipart_bytes, key_size - offset)
                     with FileChunkIO(filename, 'r', offset=offset, bytes=bytes) as fp:
-                        self.logger.info('Sending chunk %s of %s...', chunk + 1, total_chunks)
+                        self.log.info('Sending chunk %s of %s...', chunk + 1, total_chunks)
                         mp.upload_part_from_file(fp, part_num=chunk + 1)
             except:
                 mp.cancel_upload()
@@ -391,7 +391,7 @@ class S3Hook(BaseHook):
             key_size = key_obj.set_contents_from_filename(filename,
                                                           replace=replace,
                                                           encrypt_key=encrypt)
-        self.logger.info(
+        self.log.info(
             "The key {key} now contains {key_size} bytes".format(**locals())
         )
 
@@ -432,6 +432,6 @@ class S3Hook(BaseHook):
         key_size = key_obj.set_contents_from_string(string_data,
                                                     replace=replace,
                                                     encrypt_key=encrypt)
-        self.logger.info(
+        self.log.info(
             "The key {key} now contains {key_size} bytes".format(**locals())
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py
index 4617b98..92313ca 100644
--- a/airflow/hooks/base_hook.py
+++ b/airflow/hooks/base_hook.py
@@ -23,7 +23,7 @@ import random
 from airflow import settings
 from airflow.models import Connection
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
 
@@ -76,7 +76,7 @@ class BaseHook(LoggingMixin):
     def get_connection(cls, conn_id):
         conn = random.choice(cls.get_connections(conn_id))
         if conn.host:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.info("Using connection to: %s", conn.host)
         return conn
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/dbapi_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py
index 85eebd0..bbdedd7 100644
--- a/airflow/hooks/dbapi_hook.py
+++ b/airflow/hooks/dbapi_hook.py
@@ -158,7 +158,7 @@ class DbApiHook(BaseHook):
                 for s in sql:
                     if sys.version_info[0] < 3:
                         s = s.encode('utf-8')
-                    self.logger.info(s)
+                    self.log.info(s)
                     if parameters is not None:
                         cur.execute(s, parameters)
                     else:
@@ -216,12 +216,12 @@ class DbApiHook(BaseHook):
                     cur.execute(sql, values)
                     if commit_every and i % commit_every == 0:
                         conn.commit()
-                        self.logger.info(
+                        self.log.info(
                             "Loaded {i} into {table} rows so far".format(**locals())
                         )
 
             conn.commit()
-        self.logger.info(
+        self.log.info(
             "Done loading. Loaded a total of {i} rows".format(**locals()))
 
     @staticmethod

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index af3ae9b..0b13670 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -68,7 +68,7 @@ class DruidHook(BaseHook):
         while running:
             req_status = requests.get("{0}/{1}/status".format(url, druid_task_id))
 
-            self.logger.info("Job still running for %s seconds...", sec)
+            self.log.info("Job still running for %s seconds...", sec)
 
             sec = sec + 1
 
@@ -87,4 +87,4 @@ class DruidHook(BaseHook):
             else:
                 raise AirflowException('Could not get status of the job, got %s', status)
 
-        self.logger.info('Successful index')
+        self.log.info('Successful index')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 70d7642..7c73491 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -201,7 +201,7 @@ class HiveCliHook(BaseHook):
                 hive_cmd.extend(['-f', f.name])
 
                 if verbose:
-                    self.logger.info(" ".join(hive_cmd))
+                    self.log.info(" ".join(hive_cmd))
                 sp = subprocess.Popen(
                     hive_cmd,
                     stdout=subprocess.PIPE,
@@ -215,7 +215,7 @@ class HiveCliHook(BaseHook):
                         break
                     stdout += line.decode('UTF-8')
                     if verbose:
-                        self.logger.info(line.decode('UTF-8').strip())
+                        self.log.info(line.decode('UTF-8').strip())
                 sp.wait()
 
                 if sp.returncode:
@@ -246,7 +246,7 @@ class HiveCliHook(BaseHook):
             for query in query_set:
 
                 query_preview = ' '.join(query.split())[:50]
-                self.logger.info("Testing HQL [%s (...)]", query_preview)
+                self.log.info("Testing HQL [%s (...)]", query_preview)
                 if query_set == insert:
                     query = other + '; explain ' + query
                 else:
@@ -255,16 +255,16 @@ class HiveCliHook(BaseHook):
                     self.run_cli(query, verbose=False)
                 except AirflowException as e:
                     message = e.args[0].split('\n')[-2]
-                    self.logger.info(message)
+                    self.log.info(message)
                     error_loc = re.search('(\d+):(\d+)', message)
                     if error_loc and error_loc.group(1).isdigit():
                         l = int(error_loc.group(1))
                         begin = max(l-2, 0)
                         end = min(l+3, len(query.split('\n')))
                         context = '\n'.join(query.split('\n')[begin:end])
-                        self.logger.info("Context :\n %s", context)
+                        self.log.info("Context :\n %s", context)
                 else:
-                    self.logger.info("SUCCESS")
+                    self.log.info("SUCCESS")
 
     def load_df(
             self,
@@ -397,7 +397,7 @@ class HiveCliHook(BaseHook):
                 hql += "TBLPROPERTIES({tprops})\n"
         hql += ";"
         hql = hql.format(**locals())
-        self.logger.info(hql)
+        self.log.info(hql)
         self.run_cli(hql)
         hql = "LOAD DATA LOCAL INPATH '{filepath}' "
         if overwrite:
@@ -408,7 +408,7 @@ class HiveCliHook(BaseHook):
                 ["{0}='{1}'".format(k, v) for k, v in partition.items()])
             hql += "PARTITION ({pvals});"
         hql = hql.format(**locals())
-        self.logger.info(hql)
+        self.log.info(hql)
         self.run_cli(hql)
 
     def kill(self):
@@ -662,7 +662,7 @@ class HiveServer2Hook(BaseHook):
 
         # impyla uses GSSAPI instead of KERBEROS as a auth_mechanism identifier
         if auth_mechanism == 'KERBEROS':
-            self.logger.warning(
+            self.log.warning(
                 "Detected deprecated 'KERBEROS' for authMechanism for %s. Please use 'GSSAPI' instead",
                 self.hiveserver2_conn_id
             )
@@ -696,7 +696,7 @@ class HiveServer2Hook(BaseHook):
                     # may be `SET` or DDL
                     records = cur.fetchall()
                 except ProgrammingError:
-                    self.logger.debug("get_results returned no records")
+                    self.log.debug("get_results returned no records")
                 if records:
                     results = {
                         'data': records,
@@ -716,7 +716,7 @@ class HiveServer2Hook(BaseHook):
         schema = schema or 'default'
         with self.get_conn(schema) as conn:
             with conn.cursor() as cur:
-                self.logger.info("Running query: %s", hql)
+                self.log.info("Running query: %s", hql)
                 cur.execute(hql)
                 schema = cur.description
                 with open(csv_filepath, 'wb') as f:
@@ -734,8 +734,8 @@ class HiveServer2Hook(BaseHook):
 
                         writer.writerows(rows)
                         i += len(rows)
-                        self.logger.info("Written %s rows so far.", i)
-                    self.logger.info("Done. Loaded a total of %s rows.", i)
+                        self.log.info("Written %s rows so far.", i)
+                    self.log.info("Done. Loaded a total of %s rows.", i)
 
     def get_records(self, hql, schema='default'):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/http_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/http_hook.py b/airflow/hooks/http_hook.py
index f168bc8..b8075a0 100644
--- a/airflow/hooks/http_hook.py
+++ b/airflow/hooks/http_hook.py
@@ -82,7 +82,7 @@ class HttpHook(BaseHook):
                                    headers=headers)
 
         prepped_request = session.prepare_request(req)
-        self.logger.info("Sending '%s' to url: %s", self.method, url)
+        self.log.info("Sending '%s' to url: %s", self.method, url)
         return self.run_and_check(session, prepped_request, extra_options)
 
     def run_and_check(self, session, prepped_request, extra_options):
@@ -107,12 +107,12 @@ class HttpHook(BaseHook):
             # Tried rewrapping, but not supported. This way, it's possible
             # to get reason and code for failure by checking first 3 chars
             # for the code, or do a split on ':'
-            self.logger.error("HTTP error: %s", response.reason)
+            self.log.error("HTTP error: %s", response.reason)
             if self.method not in ('GET', 'HEAD'):
                 # The sensor uses GET, so this prevents filling up the log
                 # with the body every time the GET 'misses'.
                 # That's ok to do, because GETs should be repeatable and
                 # all data should be visible in the log (no post data)
-                self.logger.error(response.text)
+                self.log.error(response.text)
             raise AirflowException(str(response.status_code)+":"+response.reason)
         return response

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/oracle_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/oracle_hook.py b/airflow/hooks/oracle_hook.py
index f439daa..71c67e0 100644
--- a/airflow/hooks/oracle_hook.py
+++ b/airflow/hooks/oracle_hook.py
@@ -101,11 +101,11 @@ class OracleHook(DbApiHook):
             cur.execute(sql)
             if i % commit_every == 0:
                 conn.commit()
-                self.logger.info('Loaded {i} into {table} rows so far'.format(**locals()))
+                self.log.info('Loaded {i} into {table} rows so far'.format(**locals()))
         conn.commit()
         cur.close()
         conn.close()
-        self.logger.info('Done loading. Loaded a total of {i} rows'.format(**locals()))
+        self.log.info('Done loading. Loaded a total of {i} rows'.format(**locals()))
 
     def bulk_insert_rows(self, table, rows, target_fields=None, commit_every=5000):
         """A performant bulk insert for cx_Oracle that uses prepared statements via `executemany()`.
@@ -129,13 +129,13 @@ class OracleHook(DbApiHook):
                 cursor.prepare(prepared_stm)
                 cursor.executemany(None, row_chunk)
                 conn.commit()
-                self.logger.info('[%s] inserted %s rows', table, row_count)
+                self.log.info('[%s] inserted %s rows', table, row_count)
                 # Empty chunk
                 row_chunk = []
         # Commit the leftover chunk
         cursor.prepare(prepared_stm)
         cursor.executemany(None, row_chunk)
         conn.commit()
-        self.logger.info('[%s] inserted %s rows', table, row_count)
+        self.log.info('[%s] inserted %s rows', table, row_count)
         cursor.close()
         conn.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/pig_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py
index 29beb54..276b37a 100644
--- a/airflow/hooks/pig_hook.py
+++ b/airflow/hooks/pig_hook.py
@@ -62,7 +62,7 @@ class PigCliHook(BaseHook):
                     pig_properties_list = self.pig_properties.split()
                     pig_cmd.extend(pig_properties_list)
                 if verbose:
-                    self.logger.info(" ".join(pig_cmd))
+                    self.log.info(" ".join(pig_cmd))
                 sp = subprocess.Popen(
                     pig_cmd,
                     stdout=subprocess.PIPE,
@@ -73,7 +73,7 @@ class PigCliHook(BaseHook):
                 for line in iter(sp.stdout.readline, ''):
                     stdout += line
                     if verbose:
-                        self.logger.info(line.strip())
+                        self.log.info(line.strip())
                 sp.wait()
 
                 if sp.returncode:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/webhdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py
index e7df328..4510d29 100644
--- a/airflow/hooks/webhdfs_hook.py
+++ b/airflow/hooks/webhdfs_hook.py
@@ -17,14 +17,14 @@ from airflow import configuration
 
 from hdfs import InsecureClient, HdfsError
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 _kerberos_security_mode = configuration.get("core", "security") == "kerberos"
 if _kerberos_security_mode:
     try:
         from hdfs.ext.kerberos import KerberosClient
     except ImportError:
-        log = LoggingMixin().logger
+        log = LoggingMixin().log
         log.error("Could not load the Kerberos extension for the WebHDFSHook.")
         raise
 from airflow.exceptions import AirflowException
@@ -49,7 +49,7 @@ class WebHDFSHook(BaseHook):
         nn_connections = self.get_connections(self.webhdfs_conn_id)
         for nn in nn_connections:
             try:
-                self.logger.debug('Trying namenode %s', nn.host)
+                self.log.debug('Trying namenode %s', nn.host)
                 connection_str = 'http://{nn.host}:{nn.port}'.format(nn=nn)
                 if _kerberos_security_mode:
                     client = KerberosClient(connection_str)
@@ -57,10 +57,10 @@ class WebHDFSHook(BaseHook):
                     proxy_user = self.proxy_user or nn.login
                     client = InsecureClient(connection_str, user=proxy_user)
                 client.status('/')
-                self.logger.debug('Using namenode %s for hook', nn.host)
+                self.log.debug('Using namenode %s for hook', nn.host)
                 return client
             except HdfsError as e:
-                self.logger.debug(
+                self.log.debug(
                     "Read operation on namenode {nn.host} failed witg error: {e.message}".format(**locals())
                 )
         nn_hosts = [c.host for c in nn_connections]
@@ -101,4 +101,4 @@ class WebHDFSHook(BaseHook):
                  overwrite=overwrite,
                  n_threads=parallelism,
                  **kwargs)
-        self.logger.debug("Uploaded file %s to %s", source, destination)
+        self.log.debug("Uploaded file %s to %s", source, destination)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/zendesk_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/zendesk_hook.py b/airflow/hooks/zendesk_hook.py
index 4634b22..533e9d0 100644
--- a/airflow/hooks/zendesk_hook.py
+++ b/airflow/hooks/zendesk_hook.py
@@ -37,7 +37,7 @@ class ZendeskHook(BaseHook):
         """
         retry_after = int(
             rate_limit_exception.response.headers.get('Retry-After', 60))
-        self.logger.info(
+        self.log.info(
             "Hit Zendesk API rate limit. Pausing for %s seconds",
             retry_after
         )
@@ -75,7 +75,7 @@ class ZendeskHook(BaseHook):
                     # `github.zendesk...`
                     # in it, but the call function needs it removed.
                     next_url = next_page.split(self.__url)[1]
-                    self.logger.info("Calling %s", next_url)
+                    self.log.info("Calling %s", next_url)
                     more_res = zendesk.call(next_url)
                     results.extend(more_res[key])
                     if next_page == more_res['next_page']:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f855320..3c79ed9 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -52,7 +52,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
                                           list_py_file_paths)
 from airflow.utils.db import provide_session, pessimistic_connection_handling
 from airflow.utils.email import send_email
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 Base = models.Base
@@ -116,7 +116,7 @@ class BaseJob(Base, LoggingMixin):
         try:
             self.on_kill()
         except:
-            self.logger.error('on_kill() method failed')
+            self.log.error('on_kill() method failed')
         session.merge(job)
         session.commit()
         session.close()
@@ -179,7 +179,7 @@ class BaseJob(Base, LoggingMixin):
 
         self.heartbeat_callback(session=session)
         session.close()
-        self.logger.debug('[heart] Boom.')
+        self.log.debug('[heart] Boom.')
 
     def run(self):
         Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
@@ -271,7 +271,7 @@ class BaseJob(Base, LoggingMixin):
             ["{}".format(x) for x in reset_tis])
         session.commit()
 
-        self.logger.info(
+        self.log.info(
             "Reset the following %s TaskInstances:\n\t%s",
             len(reset_tis), task_instance_str
         )
@@ -358,7 +358,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
             # responsive file tailing
             parent_dir, _ = os.path.split(log_file)
 
-            _log = LoggingMixin().logger
+            _log = LoggingMixin().log
 
             # Create the parent directory for the log file if necessary.
             if not os.path.isdir(parent_dir):
@@ -438,7 +438,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
         # Arbitrarily wait 5s for the process to die
         self._process.join(5)
         if sigkill and self._process.is_alive():
-            self.logger.warning("Killing PID %s", self._process.pid)
+            self.log.warning("Killing PID %s", self._process.pid)
             os.kill(self._process.pid, signal.SIGKILL)
 
     @property
@@ -478,7 +478,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
         if not self._result_queue.empty():
             self._result = self._result_queue.get_nowait()
             self._done = True
-            self.logger.debug("Waiting for %s", self._process)
+            self.log.debug("Waiting for %s", self._process)
             self._process.join()
             return True
 
@@ -488,7 +488,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
             # Get the object from the queue or else join() can hang.
             if not self._result_queue.empty():
                 self._result = self._result_queue.get_nowait()
-            self.logger.debug("Waiting for %s", self._process)
+            self.log.debug("Waiting for %s", self._process)
             self._process.join()
             return True
 
@@ -578,7 +578,7 @@ class SchedulerJob(BaseJob):
         self.using_sqlite = False
         if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
             if self.max_threads > 1:
-                self.logger.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1")
+                self.log.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1")
             self.max_threads = 1
             self.using_sqlite = True
 
@@ -610,7 +610,7 @@ class SchedulerJob(BaseJob):
         tasks that should have succeeded in the past hour.
         """
         if not any([ti.sla for ti in dag.tasks]):
-            self.logger.info(
+            self.log.info(
                 "Skipping SLA check for %s because no tasks in DAG have SLAs",
                 dag
             )
@@ -693,7 +693,7 @@ class SchedulerJob(BaseJob):
             notification_sent = False
             if dag.sla_miss_callback:
                 # Execute the alert callback
-                self.logger.info(' --------------> ABOUT TO CALL SLA MISS CALL BACK ')
+                self.log.info(' --------------> ABOUT TO CALL SLA MISS CALL BACK ')
                 dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
                 notification_sent = True
             email_content = """\
@@ -843,7 +843,7 @@ class SchedulerJob(BaseJob):
                 task_start_dates = [t.start_date for t in dag.tasks]
                 if task_start_dates:
                     next_run_date = dag.normalize_schedule(min(task_start_dates))
-                    self.logger.debug(
+                    self.log.debug(
                         "Next run date based on tasks %s",
                         next_run_date
                     )
@@ -863,7 +863,7 @@ class SchedulerJob(BaseJob):
                 if next_run_date == dag.start_date:
                     next_run_date = dag.normalize_schedule(dag.start_date)
 
-                self.logger.debug(
+                self.log.debug(
                     "Dag start date: %s. Next run date: %s",
                     dag.start_date, next_run_date
                 )
@@ -914,17 +914,17 @@ class SchedulerJob(BaseJob):
         dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
         active_dag_runs = []
         for run in dag_runs:
-            self.logger.info("Examining DAG run %s", run)
+            self.log.info("Examining DAG run %s", run)
             # don't consider runs that are executed in the future
             if run.execution_date > datetime.now():
-                self.logger.error(
+                self.log.error(
                     "Execution date is in future: %s",
                     run.execution_date
                 )
                 continue
 
             if len(active_dag_runs) >= dag.max_active_runs:
-                self.logger.info("Active dag runs > max_active_run.")
+                self.log.info("Active dag runs > max_active_run.")
                 continue
 
             # skip backfill dagruns for now as long as they are not really scheduled
@@ -941,7 +941,7 @@ class SchedulerJob(BaseJob):
                 active_dag_runs.append(run)
 
         for run in active_dag_runs:
-            self.logger.debug("Examining active DAG run: %s", run)
+            self.log.debug("Examining active DAG run: %s", run)
             # this needs a fresh session sometimes tis get detached
             tis = run.get_task_instances(state=(State.NONE,
                                                 State.UP_FOR_RETRY))
@@ -962,7 +962,7 @@ class SchedulerJob(BaseJob):
                 if ti.are_dependencies_met(
                         dep_context=DepContext(flag_upstream_failed=True),
                         session=session):
-                    self.logger.debug('Queuing task: %s', ti)
+                    self.log.debug('Queuing task: %s', ti)
                     queue.append(ti.key)
 
         session.close()
@@ -1020,7 +1020,7 @@ class SchedulerJob(BaseJob):
             session.commit()
 
         if tis_changed > 0:
-            self.logger.warning(
+            self.log.warning(
                 "Set %s task instances to state=%s as their associated DagRun was not in RUNNING state",
                 tis_changed, new_state
             )
@@ -1069,13 +1069,13 @@ class SchedulerJob(BaseJob):
         task_instances_to_examine = ti_query.all()
 
         if len(task_instances_to_examine) == 0:
-            self.logger.info("No tasks to consider for execution.")
+            self.log.info("No tasks to consider for execution.")
             return executable_tis
 
         # Put one task instance on each line
         task_instance_str = "\n\t".join(
             ["{}".format(x) for x in task_instances_to_examine])
-        self.logger.info("Tasks up for execution:\n\t%s", task_instance_str)
+        self.log.info("Tasks up for execution:\n\t%s", task_instance_str)
 
         # Get the pool settings
         pools = {p.pool: p for p in session.query(models.Pool).all()}
@@ -1096,7 +1096,7 @@ class SchedulerJob(BaseJob):
                 open_slots = pools[pool].open_slots(session=session)
 
             num_queued = len(task_instances)
-            self.logger.info(
+            self.log.info(
                 "Figuring out tasks to run in Pool(name={pool}) with {open_slots} "
                 "open slots and {num_queued} task instances in queue".format(
                     **locals()
@@ -1111,7 +1111,7 @@ class SchedulerJob(BaseJob):
 
             for task_instance in priority_sorted_task_instances:
                 if open_slots <= 0:
-                    self.logger.info(
+                    self.log.info(
                         "Not scheduling since there are %s open slots in pool %s",
                         open_slots, pool
                     )
@@ -1133,12 +1133,12 @@ class SchedulerJob(BaseJob):
 
                 current_task_concurrency = dag_id_to_possibly_running_task_count[dag_id]
                 task_concurrency_limit = simple_dag_bag.get_dag(dag_id).concurrency
-                self.logger.info(
+                self.log.info(
                     "DAG %s has %s/%s running and queued tasks",
                     dag_id, current_task_concurrency, task_concurrency_limit
                 )
                 if current_task_concurrency >= task_concurrency_limit:
-                    self.logger.info(
+                    self.log.info(
                         "Not executing %s since the number of tasks running or queued from DAG %s"
                         " is >= to the DAG's task concurrency limit of %s",
                         task_instance, dag_id, task_concurrency_limit
@@ -1146,7 +1146,7 @@ class SchedulerJob(BaseJob):
                     continue
 
                 if self.executor.has_task(task_instance):
-                    self.logger.debug(
+                    self.log.debug(
                         "Not handling task %s as the executor reports it is running",
                         task_instance.key
                     )
@@ -1157,7 +1157,7 @@ class SchedulerJob(BaseJob):
 
         task_instance_str = "\n\t".join(
             ["{}".format(x) for x in executable_tis])
-        self.logger.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
+        self.log.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
         # so these dont expire on commit
         for ti in executable_tis:
             copy_dag_id = ti.dag_id
@@ -1208,7 +1208,7 @@ class SchedulerJob(BaseJob):
             .with_for_update()
             .all())
         if len(tis_to_set_to_queued) == 0:
-            self.logger.info("No tasks were able to have their state changed to queued.")
+            self.log.info("No tasks were able to have their state changed to queued.")
             session.commit()
             return []
 
@@ -1236,7 +1236,7 @@ class SchedulerJob(BaseJob):
 
         task_instance_str = "\n\t".join(
             ["{}".format(x) for x in tis_to_be_queued])
-        self.logger.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
+        self.log.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
         return tis_to_be_queued
 
     def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instances):
@@ -1268,7 +1268,7 @@ class SchedulerJob(BaseJob):
 
             priority = task_instance.priority_weight
             queue = task_instance.queue
-            self.logger.info(
+            self.log.info(
                 "Sending %s to executor with priority %s and queue %s",
                 task_instance.key, priority, queue
             )
@@ -1357,18 +1357,18 @@ class SchedulerJob(BaseJob):
         for dag in dags:
             dag = dagbag.get_dag(dag.dag_id)
             if dag.is_paused:
-                self.logger.info("Not processing DAG %s since it's paused", dag.dag_id)
+                self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
                 continue
 
             if not dag:
-                self.logger.error("DAG ID %s was not found in the DagBag", dag.dag_id)
+                self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id)
                 continue
 
-            self.logger.info("Processing %s", dag.dag_id)
+            self.log.info("Processing %s", dag.dag_id)
 
             dag_run = self.create_dag_run(dag)
             if dag_run:
-                self.logger.info("Created %s", dag_run)
+                self.log.info("Created %s", dag_run)
             self._process_task_instances(dag, tis_out)
             self.manage_slas(dag)
 
@@ -1384,7 +1384,7 @@ class SchedulerJob(BaseJob):
         """
         for key, executor_state in list(self.executor.get_event_buffer().items()):
             dag_id, task_id, execution_date = key
-            self.logger.info(
+            self.log.info(
                 "Executor reports %s.%s execution_date=%s as %s",
                 dag_id, task_id, execution_date, executor_state
             )
@@ -1453,10 +1453,10 @@ class SchedulerJob(BaseJob):
                    "\n" +
                    "=" * 80)
 
-        self.logger.info(log_str)
+        self.log.info(log_str)
 
     def _execute(self):
-        self.logger.info("Starting the scheduler")
+        self.log.info("Starting the scheduler")
         pessimistic_connection_handling()
 
         # DAGs can be pickled for easier remote execution by some executors
@@ -1469,16 +1469,16 @@ class SchedulerJob(BaseJob):
         # DAGs in parallel. By processing them in separate processes,
         # we can get parallelism and isolation from potentially harmful
         # user code.
-        self.logger.info("Processing files using up to %s processes at a time", self.max_threads)
-        self.logger.info("Running execute loop for %s seconds", self.run_duration)
-        self.logger.info("Processing each file at most %s times", self.num_runs)
-        self.logger.info("Process each file at most once every %s seconds", self.file_process_interval)
-        self.logger.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval)
+        self.log.info("Processing files using up to %s processes at a time", self.max_threads)
+        self.log.info("Running execute loop for %s seconds", self.run_duration)
+        self.log.info("Processing each file at most %s times", self.num_runs)
+        self.log.info("Process each file at most once every %s seconds", self.file_process_interval)
+        self.log.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval)
 
         # Build up a list of Python files that could contain DAGs
-        self.logger.info("Searching for files in %s", self.subdir)
+        self.log.info("Searching for files in %s", self.subdir)
         known_file_paths = list_py_file_paths(self.subdir)
-        self.logger.info("There are %s files in %s", len(known_file_paths), self.subdir)
+        self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
 
         def processor_factory(file_path, log_file_path):
             return DagFileProcessor(file_path,
@@ -1497,7 +1497,7 @@ class SchedulerJob(BaseJob):
         try:
             self._execute_helper(processor_manager)
         finally:
-            self.logger.info("Exited execute loop")
+            self.log.info("Exited execute loop")
 
             # Kill all child processes on exit since we don't want to leave
             # them as orphaned.
@@ -1511,22 +1511,22 @@ class SchedulerJob(BaseJob):
                 child_processes = [x for x in this_process.children(recursive=True)
                                    if x.is_running() and x.pid in pids_to_kill]
                 for child in child_processes:
-                    self.logger.info("Terminating child PID: %s", child.pid)
+                    self.log.info("Terminating child PID: %s", child.pid)
                     child.terminate()
                 # TODO: Remove magic number
                 timeout = 5
-                self.logger.info("Waiting up to %s seconds for processes to exit...", timeout)
+                self.log.info("Waiting up to %s seconds for processes to exit...", timeout)
                 try:
                     psutil.wait_procs(child_processes, timeout)
                 except psutil.TimeoutExpired:
-                    self.logger.debug("Ran out of time while waiting for processes to exit")
+                    self.log.debug("Ran out of time while waiting for processes to exit")
 
                 # Then SIGKILL
                 child_processes = [x for x in this_process.children(recursive=True)
                                    if x.is_running() and x.pid in pids_to_kill]
                 if len(child_processes) > 0:
                     for child in child_processes:
-                        self.logger.info("Killing child PID: %s", child.pid)
+                        self.log.info("Killing child PID: %s", child.pid)
                         child.kill()
                         child.wait()
 
@@ -1539,7 +1539,7 @@ class SchedulerJob(BaseJob):
         self.executor.start()
 
         session = settings.Session()
-        self.logger.info("Resetting orphaned tasks for active dag runs")
+        self.log.info("Resetting orphaned tasks for active dag runs")
         self.reset_state_for_orphaned_tasks(session=session)
         session.close()
 
@@ -1558,7 +1558,7 @@ class SchedulerJob(BaseJob):
         # For the execute duration, parse and schedule DAGs
         while (datetime.now() - execute_start_time).total_seconds() < \
                 self.run_duration or self.run_duration < 0:
-            self.logger.debug("Starting Loop...")
+            self.log.debug("Starting Loop...")
             loop_start_time = time.time()
 
             # Traverse the DAG directory for Python files containing DAGs
@@ -1568,23 +1568,23 @@ class SchedulerJob(BaseJob):
 
             if elapsed_time_since_refresh > self.dag_dir_list_interval:
                 # Build up a list of Python files that could contain DAGs
-                self.logger.info("Searching for files in %s", self.subdir)
+                self.log.info("Searching for files in %s", self.subdir)
                 known_file_paths = list_py_file_paths(self.subdir)
                 last_dag_dir_refresh_time = datetime.now()
-                self.logger.info("There are %s files in %s", len(known_file_paths), self.subdir)
+                self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
                 processor_manager.set_file_paths(known_file_paths)
 
-                self.logger.debug("Removing old import errors")
+                self.log.debug("Removing old import errors")
                 self.clear_nonexistent_import_errors(known_file_paths=known_file_paths)
 
             # Kick of new processes and collect results from finished ones
-            self.logger.info("Heartbeating the process manager")
+            self.log.info("Heartbeating the process manager")
             simple_dags = processor_manager.heartbeat()
 
             if self.using_sqlite:
                 # For the sqlite case w/ 1 thread, wait until the processor
                 # is finished to avoid concurrent access to the DB.
-                self.logger.debug("Waiting for processors to finish since we're using sqlite")
+                self.log.debug("Waiting for processors to finish since we're using sqlite")
                 processor_manager.wait_until_finished()
 
             # Send tasks for execution if available
@@ -1613,7 +1613,7 @@ class SchedulerJob(BaseJob):
                                              (State.SCHEDULED,))
 
             # Call hearbeats
-            self.logger.info("Heartbeating the executor")
+            self.log.info("Heartbeating the executor")
             self.executor.heartbeat()
 
             # Process events from the executor
@@ -1623,7 +1623,7 @@ class SchedulerJob(BaseJob):
             time_since_last_heartbeat = (datetime.now() -
                                          last_self_heartbeat_time).total_seconds()
             if time_since_last_heartbeat > self.heartrate:
-                self.logger.info("Heartbeating the scheduler")
+                self.log.info("Heartbeating the scheduler")
                 self.heartbeat()
                 last_self_heartbeat_time = datetime.now()
 
@@ -1636,13 +1636,13 @@ class SchedulerJob(BaseJob):
                 last_stat_print_time = datetime.now()
 
             loop_end_time = time.time()
-            self.logger.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
-            self.logger.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
+            self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
+            self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
             time.sleep(self._processor_poll_interval)
 
             # Exit early for a test mode
             if processor_manager.max_runs_reached():
-                self.logger.info("Exiting loop as all files have been processed %s times", self.num_runs)
+                self.log.info("Exiting loop as all files have been processed %s times", self.num_runs)
                 break
 
         # Stop any processors
@@ -1657,7 +1657,7 @@ class SchedulerJob(BaseJob):
                 all_files_processed = False
                 break
         if all_files_processed:
-            self.logger.info(
+            self.log.info(
                 "Deactivating DAGs that haven't been touched since %s",
                 execute_start_time.isoformat()
             )
@@ -1693,21 +1693,21 @@ class SchedulerJob(BaseJob):
         :return: a list of SimpleDags made from the Dags found in the file
         :rtype: list[SimpleDag]
         """
-        self.logger.info("Processing file %s for tasks to queue", file_path)
+        self.log.info("Processing file %s for tasks to queue", file_path)
         # As DAGs are parsed from this file, they will be converted into SimpleDags
         simple_dags = []
 
         try:
             dagbag = models.DagBag(file_path)
         except Exception:
-            self.logger.exception("Failed at reloading the DAG file %s", file_path)
+            self.log.exception("Failed at reloading the DAG file %s", file_path)
             Stats.incr('dag_file_refresh_error', 1, 1)
             return []
 
         if len(dagbag.dags) > 0:
-            self.logger.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
+            self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
         else:
-            self.logger.warning("No viable dags retrieved from %s", file_path)
+            self.log.warning("No viable dags retrieved from %s", file_path)
             self.update_import_errors(session, dagbag)
             return []
 
@@ -1777,7 +1777,7 @@ class SchedulerJob(BaseJob):
                 ti.state = State.SCHEDULED
 
             # Also save this task instance to the DB.
-            self.logger.info("Creating / updating %s in ORM", ti)
+            self.log.info("Creating / updating %s in ORM", ti)
             session.merge(ti)
             session.commit()
 
@@ -1785,11 +1785,11 @@ class SchedulerJob(BaseJob):
         try:
             self.update_import_errors(session, dagbag)
         except Exception:
-            self.logger.exception("Error logging import errors!")
+            self.log.exception("Error logging import errors!")
         try:
             dagbag.kill_zombies()
         except Exception:
-            self.logger.exception("Error killing zombies!")
+            self.log.exception("Error killing zombies!")
 
         return simple_dags
 
@@ -1908,22 +1908,22 @@ class BackfillJob(BaseJob):
             ti.refresh_from_db()
             if ti.state == State.SUCCESS:
                 ti_status.succeeded.add(key)
-                self.logger.debug("Task instance %s succeeded. Don't rerun.", ti)
+                self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
                 ti_status.started.pop(key)
                 continue
             elif ti.state == State.SKIPPED:
                 ti_status.skipped.add(key)
-                self.logger.debug("Task instance %s skipped. Don't rerun.", ti)
+                self.log.debug("Task instance %s skipped. Don't rerun.", ti)
                 ti_status.started.pop(key)
                 continue
             elif ti.state == State.FAILED:
-                self.logger.error("Task instance %s failed", ti)
+                self.log.error("Task instance %s failed", ti)
                 ti_status.failed.add(key)
                 ti_status.started.pop(key)
                 continue
             # special case: if the task needs to run again put it back
             elif ti.state == State.UP_FOR_RETRY:
-                self.logger.warning("Task instance %s is up for retry", ti)
+                self.log.warning("Task instance %s is up for retry", ti)
                 ti_status.started.pop(key)
                 ti_status.to_run[key] = ti
             # special case: The state of the task can be set to NONE by the task itself
@@ -1932,7 +1932,7 @@ class BackfillJob(BaseJob):
             # for that as otherwise those tasks would fall outside of the scope of
             # the backfill suddenly.
             elif ti.state == State.NONE:
-                self.logger.warning(
+                self.log.warning(
                     "FIXME: task instance %s state was set to none externally or "
                     "reaching concurrency limits. Re-adding task to queue.",
                     ti
@@ -1953,7 +1953,7 @@ class BackfillJob(BaseJob):
 
         for key, state in list(executor.get_event_buffer().items()):
             if key not in started:
-                self.logger.warning(
+                self.log.warning(
                     "%s state %s not in started=%s",
                     key, state, started.values()
                 )
@@ -1962,14 +1962,14 @@ class BackfillJob(BaseJob):
             ti = started[key]
             ti.refresh_from_db()
 
-            self.logger.debug("Executor state: %s task %s", state, ti)
+            self.log.debug("Executor state: %s task %s", state, ti)
 
             if state == State.FAILED or state == State.SUCCESS:
                 if ti.state == State.RUNNING or ti.state == State.QUEUED:
                     msg = ("Executor reports task instance {} finished ({}) "
                            "although the task says its {}. Was the task "
                            "killed externally?".format(ti, state, ti.state))
-                    self.logger.error(msg)
+                    self.log.error(msg)
                     ti.handle_failure(msg)
 
     @provide_session
@@ -2083,9 +2083,9 @@ class BackfillJob(BaseJob):
             len(ti_status.skipped),
             len(ti_status.deadlocked),
             len(ti_status.not_ready))
-        self.logger.info(msg)
+        self.log.info(msg)
 
-        self.logger.debug(
+        self.log.debug(
             "Finished dag run loop iteration. Remaining tasks %s",
             ti_status.to_run.values()
         )
@@ -2118,7 +2118,7 @@ class BackfillJob(BaseJob):
 
         while ((len(ti_status.to_run) > 0 or len(ti_status.started) > 0) and
                 len(ti_status.deadlocked) == 0):
-            self.logger.debug("*** Clearing out not_ready list ***")
+            self.log.debug("*** Clearing out not_ready list ***")
             ti_status.not_ready.clear()
 
             # we need to execute the tasks bottom to top
@@ -2138,12 +2138,12 @@ class BackfillJob(BaseJob):
                     ignore_depends_on_past = (
                         self.ignore_first_depends_on_past and
                         ti.execution_date == (start_date or ti.start_date))
-                    self.logger.debug("Task instance to run %s state %s", ti, ti.state)
+                    self.log.debug("Task instance to run %s state %s", ti, ti.state)
 
                     # guard against externally modified tasks instances or
                     # in case max concurrency has been reached at task runtime
                     if ti.state == State.NONE:
-                        self.logger.warning(
+                        self.log.warning(
                             "FIXME: task instance {} state was set to None externally. This should not happen"
                         )
                         ti.set_state(State.SCHEDULED, session=session)
@@ -2152,27 +2152,27 @@ class BackfillJob(BaseJob):
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:
                         ti_status.succeeded.add(key)
-                        self.logger.debug("Task instance %s succeeded. Don't rerun.", ti)
+                        self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         continue
                     elif ti.state == State.SKIPPED:
                         ti_status.skipped.add(key)
-                        self.logger.debug("Task instance %s skipped. Don't rerun.", ti)
+                        self.log.debug("Task instance %s skipped. Don't rerun.", ti)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         continue
                     elif ti.state == State.FAILED:
-                        self.logger.error("Task instance %s failed", ti)
+                        self.log.error("Task instance %s failed", ti)
                         ti_status.failed.add(key)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         continue
                     elif ti.state == State.UPSTREAM_FAILED:
-                        self.logger.error("Task instance %s upstream failed", ti)
+                        self.log.error("Task instance %s upstream failed", ti)
                         ti_status.failed.add(key)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
@@ -2194,12 +2194,12 @@ class BackfillJob(BaseJob):
                         ti.refresh_from_db(lock_for_update=True, session=session)
                         if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY:
                             if executor.has_task(ti):
-                                self.logger.debug(
+                                self.log.debug(
                                     "Task Instance %s already in executor waiting for queue to clear",
                                     ti
                                 )
                             else:
-                                self.logger.debug('Sending %s to executor', ti)
+                                self.log.debug('Sending %s to executor', ti)
                                 # Skip scheduled state, we are executing immediately
                                 ti.state = State.QUEUED
                                 session.merge(ti)
@@ -2216,7 +2216,7 @@ class BackfillJob(BaseJob):
                         continue
 
                     if ti.state == State.UPSTREAM_FAILED:
-                        self.logger.error("Task instance %s upstream failed", ti)
+                        self.log.error("Task instance %s upstream failed", ti)
                         ti_status.failed.add(key)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
@@ -2225,14 +2225,14 @@ class BackfillJob(BaseJob):
 
                     # special case
                     if ti.state == State.UP_FOR_RETRY:
-                        self.logger.debug("Task instance %s retry period not expired yet", ti)
+                        self.log.debug("Task instance %s retry period not expired yet", ti)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         ti_status.to_run[key] = ti
                         continue
 
                     # all remaining tasks
-                    self.logger.debug('Adding %s to not_ready', ti)
+                    self.log.debug('Adding %s to not_ready', ti)
                     ti_status.not_ready.add(key)
 
             # execute the tasks in the queue
@@ -2245,7 +2245,7 @@ class BackfillJob(BaseJob):
             if (ti_status.not_ready and
                     ti_status.not_ready == set(ti_status.to_run) and
                     len(ti_status.started) == 0):
-                self.logger.warning(
+                self.log.warning(
                     "Deadlock discovered for ti_status.to_run=%s",
                     ti_status.to_run.values()
                 )
@@ -2364,7 +2364,7 @@ class BackfillJob(BaseJob):
         run_dates = self.dag.get_run_dates(start_date=start_date,
                                            end_date=self.bf_end_date)
         if len(run_dates) == 0:
-            self.logger.info("No run dates were found for the given dates and dag interval.")
+            self.log.info("No run dates were found for the given dates and dag interval.")
             return
 
         # picklin'
@@ -2402,7 +2402,7 @@ class BackfillJob(BaseJob):
                     raise AirflowException(err)
 
                 if remaining_dates > 0:
-                    self.logger.info(
+                    self.log.info(
                         "max_active_runs limit for dag %s has been reached "
                         " - waiting for other dag runs to finish",
                         self.dag_id
@@ -2413,7 +2413,7 @@ class BackfillJob(BaseJob):
             session.commit()
             session.close()
 
-        self.logger.info("Backfill done. Exiting.")
+        self.log.info("Backfill done. Exiting.")
 
 
 class LocalTaskJob(BaseJob):
@@ -2453,7 +2453,7 @@ class LocalTaskJob(BaseJob):
 
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
-            self.logger.error("Killing subprocess")
+            self.log.error("Killing subprocess")
             self.on_kill()
             raise AirflowException("LocalTaskJob received SIGTERM signal")
         signal.signal(signal.SIGTERM, signal_handler)
@@ -2466,7 +2466,7 @@ class LocalTaskJob(BaseJob):
                 ignore_ti_state=self.ignore_ti_state,
                 job_id=self.id,
                 pool=self.pool):
-            self.logger.info("Task is not able to be run")
+            self.log.info("Task is not able to be run")
             return
 
         try:
@@ -2479,7 +2479,7 @@ class LocalTaskJob(BaseJob):
                 # Monitor the task to see if it's done
                 return_code = self.task_runner.return_code()
                 if return_code is not None:
-                    self.logger.info("Task exited with return code %s", return_code)
+                    self.log.info("Task exited with return code %s", return_code)
                     return
 
                 # Periodically heartbeat so that the scheduler doesn't think this
@@ -2489,7 +2489,7 @@ class LocalTaskJob(BaseJob):
                     last_heartbeat_time = time.time()
                 except OperationalError:
                     Stats.incr('local_task_job_heartbeat_failure', 1, 1)
-                    self.logger.exception(
+                    self.log.exception(
                         "Exception while trying to heartbeat! Sleeping for %s seconds",
                         self.heartrate
                     )
@@ -2500,7 +2500,7 @@ class LocalTaskJob(BaseJob):
                 time_since_last_heartbeat = time.time() - last_heartbeat_time
                 if time_since_last_heartbeat > heartbeat_time_limit:
                     Stats.incr('local_task_job_prolonged_heartbeat_failure', 1, 1)
-                    self.logger.error("Heartbeat time limited exceeded!")
+                    self.log.error("Heartbeat time limited exceeded!")
                     raise AirflowException("Time since last heartbeat({:.2f}s) "
                                            "exceeded limit ({}s)."
                                            .format(time_since_last_heartbeat,
@@ -2530,18 +2530,18 @@ class LocalTaskJob(BaseJob):
 
         if ti.state == State.RUNNING:
             if not same_hostname:
-                self.logger.warning("The recorded hostname {ti.hostname} "
+                self.log.warning("The recorded hostname {ti.hostname} "
                                 "does not match this instance's hostname "
                                 "{fqdn}".format(**locals()))
                 raise AirflowException("Hostname of job runner does not match")
             elif not same_process:
                 current_pid = os.getpid()
-                self.logger.warning("Recorded pid {ti.pid} does not match the current pid "
+                self.log.warning("Recorded pid {ti.pid} does not match the current pid "
                                 "{current_pid}".format(**locals()))
                 raise AirflowException("PID of job runner does not match")
         elif (self.task_runner.return_code() is None
               and hasattr(self.task_runner, 'process')):
-            self.logger.warning(
+            self.log.warning(
                 "State of this instance has been externally set to %s. Taking the poison pill.",
                 ti.state
             )


[4/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to log

Posted by bo...@apache.org.
[AIRFLOW-1604] Rename logger to log

In all the popular languages the variable name log
is the de facto
standard for the logging. Rename LoggingMixin.py
to logging_mixin.py
to comply with the Python standard.

When using the .logger a deprecation warning will
be emitted.

Closes #2604 from Fokko/AIRFLOW-1604-logger-to-log


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb2f5890
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb2f5890
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb2f5890

Branch: refs/heads/master
Commit: eb2f589099b87743482c2eb16261b49e284dcd96
Parents: 8e253c7
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Tue Sep 19 10:17:14 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Sep 19 10:17:14 2017 +0200

----------------------------------------------------------------------
 airflow/__init__.py                             |   4 +-
 airflow/api/__init__.py                         |   4 +-
 airflow/api/auth/backend/kerberos_auth.py       |   4 +-
 airflow/bin/cli.py                              |  10 +-
 airflow/configuration.py                        |   4 +-
 .../auth/backends/github_enterprise_auth.py     |   4 +-
 airflow/contrib/auth/backends/google_auth.py    |   4 +-
 airflow/contrib/auth/backends/kerberos_auth.py  |   2 +-
 airflow/contrib/auth/backends/ldap_auth.py      |   4 +-
 airflow/contrib/auth/backends/password_auth.py  |   4 +-
 airflow/contrib/executors/mesos_executor.py     |  34 +--
 airflow/contrib/hooks/bigquery_hook.py          |  24 +--
 airflow/contrib/hooks/cloudant_hook.py          |   4 +-
 airflow/contrib/hooks/databricks_hook.py        |   8 +-
 airflow/contrib/hooks/datadog_hook.py           |   6 +-
 airflow/contrib/hooks/datastore_hook.py         |   4 +-
 airflow/contrib/hooks/ftp_hook.py               |   6 +-
 airflow/contrib/hooks/gcp_api_base_hook.py      |   6 +-
 airflow/contrib/hooks/gcp_dataflow_hook.py      |  14 +-
 airflow/contrib/hooks/gcp_dataproc_hook.py      |  18 +-
 airflow/contrib/hooks/gcp_mlengine_hook.py      |  18 +-
 airflow/contrib/hooks/gcs_hook.py               |   4 +-
 airflow/contrib/hooks/jira_hook.py              |   4 +-
 airflow/contrib/hooks/qubole_hook.py            |  12 +-
 airflow/contrib/hooks/redis_hook.py             |   6 +-
 airflow/contrib/hooks/salesforce_hook.py        |  14 +-
 airflow/contrib/hooks/spark_sql_hook.py         |   6 +-
 airflow/contrib/hooks/spark_submit_hook.py      |  14 +-
 airflow/contrib/hooks/sqoop_hook.py             |  10 +-
 airflow/contrib/hooks/ssh_hook.py               |  14 +-
 airflow/contrib/operators/bigquery_operator.py  |   2 +-
 .../operators/bigquery_table_delete_operator.py |   2 +-
 .../contrib/operators/bigquery_to_bigquery.py   |   2 +-
 airflow/contrib/operators/bigquery_to_gcs.py    |   6 +-
 .../contrib/operators/databricks_operator.py    |  12 +-
 airflow/contrib/operators/dataproc_operator.py  |  16 +-
 .../operators/datastore_export_operator.py      |   2 +-
 .../operators/datastore_import_operator.py      |   2 +-
 airflow/contrib/operators/ecs_operator.py       |  12 +-
 .../contrib/operators/emr_add_steps_operator.py |   4 +-
 .../operators/emr_create_job_flow_operator.py   |   4 +-
 .../emr_terminate_job_flow_operator.py          |   4 +-
 airflow/contrib/operators/file_to_wasb.py       |   2 +-
 airflow/contrib/operators/fs_operator.py        |   2 +-
 .../contrib/operators/gcs_download_operator.py  |   4 +-
 airflow/contrib/operators/gcs_to_bq.py          |   2 +-
 airflow/contrib/operators/hipchat_operator.py   |   4 +-
 airflow/contrib/operators/mlengine_operator.py  |  14 +-
 airflow/contrib/operators/mysql_to_gcs.py       |   2 +-
 airflow/contrib/operators/sftp_operator.py      |   4 +-
 airflow/contrib/operators/vertica_operator.py   |   2 +-
 airflow/contrib/operators/vertica_to_hive.py    |   4 +-
 airflow/contrib/sensors/bigquery_sensor.py      |   2 +-
 airflow/contrib/sensors/datadog_sensor.py       |   2 +-
 airflow/contrib/sensors/emr_base_sensor.py      |   4 +-
 airflow/contrib/sensors/emr_job_flow_sensor.py  |   2 +-
 airflow/contrib/sensors/emr_step_sensor.py      |   2 +-
 airflow/contrib/sensors/ftp_sensor.py           |   2 +-
 airflow/contrib/sensors/gcs_sensor.py           |   4 +-
 airflow/contrib/sensors/hdfs_sensors.py         |   6 +-
 airflow/contrib/sensors/jira_sensor.py          |  14 +-
 airflow/contrib/sensors/redis_key_sensor.py     |   2 +-
 airflow/contrib/sensors/wasb_sensor.py          |   4 +-
 .../contrib/task_runner/cgroup_task_runner.py   |  20 +-
 airflow/executors/__init__.py                   |   4 +-
 airflow/executors/base_executor.py              |  14 +-
 airflow/executors/celery_executor.py            |  16 +-
 airflow/executors/dask_executor.py              |   4 +-
 airflow/executors/local_executor.py             |   6 +-
 airflow/executors/sequential_executor.py        |   4 +-
 airflow/hooks/S3_hook.py                        |  10 +-
 airflow/hooks/base_hook.py                      |   4 +-
 airflow/hooks/dbapi_hook.py                     |   6 +-
 airflow/hooks/druid_hook.py                     |   4 +-
 airflow/hooks/hive_hooks.py                     |  26 +--
 airflow/hooks/http_hook.py                      |   6 +-
 airflow/hooks/oracle_hook.py                    |   8 +-
 airflow/hooks/pig_hook.py                       |   4 +-
 airflow/hooks/webhdfs_hook.py                   |  12 +-
 airflow/hooks/zendesk_hook.py                   |   4 +-
 airflow/jobs.py                                 | 206 +++++++++----------
 airflow/models.py                               | 126 ++++++------
 airflow/operators/bash_operator.py              |  14 +-
 airflow/operators/check_operator.py             |  20 +-
 airflow/operators/dagrun_operator.py            |   4 +-
 airflow/operators/docker_operator.py            |  10 +-
 airflow/operators/generic_transfer.py           |  10 +-
 airflow/operators/hive_operator.py              |   2 +-
 airflow/operators/hive_stats_operator.py        |   8 +-
 airflow/operators/hive_to_druid.py              |  10 +-
 airflow/operators/hive_to_mysql.py              |  10 +-
 airflow/operators/hive_to_samba_operator.py     |   4 +-
 airflow/operators/http_operator.py              |   2 +-
 airflow/operators/jdbc_operator.py              |   2 +-
 airflow/operators/latest_only_operator.py       |  12 +-
 airflow/operators/mssql_operator.py             |   2 +-
 airflow/operators/mssql_to_hive.py              |   4 +-
 airflow/operators/mysql_operator.py             |   2 +-
 airflow/operators/mysql_to_hive.py              |   4 +-
 airflow/operators/oracle_operator.py            |   2 +-
 airflow/operators/pig_operator.py               |   2 +-
 airflow/operators/postgres_operator.py          |   2 +-
 airflow/operators/presto_to_mysql.py            |   8 +-
 airflow/operators/python_operator.py            |  36 ++--
 airflow/operators/redshift_to_s3_operator.py    |   6 +-
 airflow/operators/s3_file_transform_operator.py |  12 +-
 airflow/operators/s3_to_hive_operator.py        |  24 +--
 airflow/operators/sensors.py                    |  34 +--
 airflow/operators/slack_operator.py             |   2 +-
 airflow/operators/sqlite_operator.py            |   2 +-
 airflow/plugins_manager.py                      |   4 +-
 airflow/security/kerberos.py                    |   2 +-
 airflow/settings.py                             |   4 +-
 airflow/task_runner/base_task_runner.py         |  10 +-
 airflow/task_runner/bash_task_runner.py         |   2 +-
 airflow/utils/dag_processing.py                 |  18 +-
 airflow/utils/db.py                             |   4 +-
 airflow/utils/email.py                          |   4 +-
 airflow/utils/log/LoggingMixin.py               |  45 ----
 airflow/utils/log/gcs_task_handler.py           |   8 +-
 airflow/utils/log/logging_mixin.py              |  61 ++++++
 airflow/utils/log/s3_task_handler.py            |   8 +-
 airflow/utils/timeout.py                        |  12 +-
 airflow/www/api/experimental/endpoints.py       |   4 +-
 airflow/www/app.py                              |   2 +-
 scripts/perf/scheduler_ops_metrics.py           |   4 +-
 tests/contrib/hooks/test_databricks_hook.py     |   2 +-
 .../contrib/operators/test_dataproc_operator.py |   8 +-
 tests/contrib/sensors/test_hdfs_sensors.py      |  62 +++---
 tests/executors/test_executor.py                |   4 +-
 tests/operators/sensors.py                      |   4 +-
 tests/test_utils/reset_warning_registry.py      |  82 ++++++++
 tests/utils/log/test_logging.py                 |   6 +-
 tests/utils/test_logging_mixin.py               |  50 +++++
 134 files changed, 858 insertions(+), 708 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 8844eeb..3c5f24c 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -21,7 +21,7 @@ in their PYTHONPATH. airflow_login should be based off the
 """
 from builtins import object
 from airflow import version
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 __version__ = version.version
 
@@ -41,7 +41,7 @@ login = None
 
 
 def load_login():
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
 
     auth_backend = 'airflow.default_login'
     try:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/api/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py
index 39edbed..31a303b 100644
--- a/airflow/api/__init__.py
+++ b/airflow/api/__init__.py
@@ -17,11 +17,11 @@ from airflow.exceptions import AirflowException
 from airflow import configuration as conf
 from importlib import import_module
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 api_auth = None
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def load_auth():

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/api/auth/backend/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py
index 73a5aa2..a904d59 100644
--- a/airflow/api/auth/backend/kerberos_auth.py
+++ b/airflow/api/auth/backend/kerberos_auth.py
@@ -24,7 +24,7 @@
 
 from future.standard_library import install_aliases
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 install_aliases()
 
@@ -47,7 +47,7 @@ client_auth = HTTPKerberosAuth(service='airflow')
 
 _SERVICE_NAME = None
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def init_app(app):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 56f1855..5035a66 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -53,7 +53,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
 
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -64,7 +64,7 @@ api_module = import_module(conf.get('cli', 'api_client'))
 api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
                                auth=api.api_auth.client_auth)
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def sigint_handler(sig, frame):
@@ -189,7 +189,7 @@ def trigger_dag(args):
     :param args:
     :return:
     """
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
     try:
         message = api_client.trigger_dag(dag_id=args.dag_id,
                                          run_id=args.run_id,
@@ -202,7 +202,7 @@ def trigger_dag(args):
 
 
 def pool(args):
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
 
     def _tabulate(pools):
         return "\n%s" % tabulate(pools, ['Pool', 'Slots', 'Description'],
@@ -330,7 +330,7 @@ def run(args, dag=None):
     if dag:
         args.dag_id = dag.dag_id
 
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
 
     # Load custom airflow config
     if args.cfg_path:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index db196f9..ff81d98 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -28,7 +28,7 @@ import sys
 
 from future import standard_library
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 standard_library.install_aliases()
 
@@ -38,7 +38,7 @@ from six.moves import configparser
 
 from airflow.exceptions import AirflowConfigException
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 # show Airflow's deprecation warnings
 warnings.filterwarnings(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/github_enterprise_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py
index 459e9c9..28c3cfc 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -27,9 +27,9 @@ from flask_oauthlib.client import OAuth
 
 from airflow import models, configuration, settings
 from airflow.configuration import AirflowConfigException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def get_config_param(param):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/google_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py
index f38f725..e6eab94 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -26,9 +26,9 @@ from flask import url_for, redirect, request
 from flask_oauthlib.client import OAuth
 
 from airflow import models, configuration, settings
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def get_config_param(param):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py
index ffb711f..908ebc9 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -29,7 +29,7 @@ from flask import url_for, redirect
 from airflow import settings
 from airflow import models
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 login_manager = flask_login.LoginManager()
 login_manager.login_view = 'airflow.login'  # Calls login() below

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/ldap_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 8ce0875..b056851 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -33,13 +33,13 @@ from airflow.configuration import AirflowConfigException
 import traceback
 import re
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 login_manager = flask_login.LoginManager()
 login_manager.login_view = 'airflow.login'  # Calls login() below
 login_manager.login_message = None
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 class AuthenticationError(Exception):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/password_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/password_auth.py b/airflow/contrib/auth/backends/password_auth.py
index 3ad2a8b..8adb1f4 100644
--- a/airflow/contrib/auth/backends/password_auth.py
+++ b/airflow/contrib/auth/backends/password_auth.py
@@ -32,13 +32,13 @@ from sqlalchemy.ext.hybrid import hybrid_property
 
 from airflow import settings
 from airflow import models
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 login_manager = flask_login.LoginManager()
 login_manager.login_view = 'airflow.login'  # Calls login() below
 login_manager.login_message = None
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 PY3 = version_info[0] == 3
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/executors/mesos_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py
index 19d72ed..8728566 100644
--- a/airflow/contrib/executors/mesos_executor.py
+++ b/airflow/contrib/executors/mesos_executor.py
@@ -14,7 +14,7 @@
 
 from future import standard_library
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.www.utils import LoginMixin
 
 standard_library.install_aliases()
@@ -65,7 +65,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
         self.task_key_map = {}
 
     def registered(self, driver, frameworkId, masterInfo):
-        self.logger.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value)
+        self.log.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value)
 
         if configuration.getboolean('mesos', 'CHECKPOINT') and configuration.get('mesos', 'FAILOVER_TIMEOUT'):
             # Import here to work around a circular import error
@@ -86,25 +86,25 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
             Session.remove()
 
     def reregistered(self, driver, masterInfo):
-        self.logger.info("AirflowScheduler re-registered to mesos")
+        self.log.info("AirflowScheduler re-registered to mesos")
 
     def disconnected(self, driver):
-        self.logger.info("AirflowScheduler disconnected from mesos")
+        self.log.info("AirflowScheduler disconnected from mesos")
 
     def offerRescinded(self, driver, offerId):
-        self.logger.info("AirflowScheduler offer %s rescinded", str(offerId))
+        self.log.info("AirflowScheduler offer %s rescinded", str(offerId))
 
     def frameworkMessage(self, driver, executorId, slaveId, message):
-        self.logger.info("AirflowScheduler received framework message %s", message)
+        self.log.info("AirflowScheduler received framework message %s", message)
 
     def executorLost(self, driver, executorId, slaveId, status):
-        self.logger.warning("AirflowScheduler executor %s lost", str(executorId))
+        self.log.warning("AirflowScheduler executor %s lost", str(executorId))
 
     def slaveLost(self, driver, slaveId):
-        self.logger.warning("AirflowScheduler slave %s lost", str(slaveId))
+        self.log.warning("AirflowScheduler slave %s lost", str(slaveId))
 
     def error(self, driver, message):
-        self.logger.error("AirflowScheduler driver aborted %s", message)
+        self.log.error("AirflowScheduler driver aborted %s", message)
         raise AirflowException("AirflowScheduler driver aborted %s" % message)
 
     def resourceOffers(self, driver, offers):
@@ -118,7 +118,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
                 elif resource.name == "mem":
                     offerMem += resource.scalar.value
 
-            self.logger.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
+            self.log.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
 
             remainingCpus = offerCpus
             remainingMem = offerMem
@@ -131,7 +131,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
                 self.task_counter += 1
                 self.task_key_map[str(tid)] = key
 
-                self.logger.info("Launching task %d using offer %s", tid, offer.id.value)
+                self.log.info("Launching task %d using offer %s", tid, offer.id.value)
 
                 task = mesos_pb2.TaskInfo()
                 task.task_id.value = str(tid)
@@ -161,7 +161,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
             driver.launchTasks(offer.id, tasks)
 
     def statusUpdate(self, driver, update):
-        self.logger.info(
+        self.log.info(
             "Task %s is in state %s, data %s",
             update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data)
         )
@@ -171,7 +171,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
         except KeyError:
             # The map may not contain an item if the framework re-registered after a failover.
             # Discard these tasks.
-            self.logger.warning("Unrecognised task key %s", update.task_id.value)
+            self.log.warning("Unrecognised task key %s", update.task_id.value)
             return
 
         if update.state == mesos_pb2.TASK_FINISHED:
@@ -203,7 +203,7 @@ class MesosExecutor(BaseExecutor, LoginMixin):
         framework.user = ''
 
         if not configuration.get('mesos', 'MASTER'):
-            self.logger.error("Expecting mesos master URL for mesos executor")
+            self.log.error("Expecting mesos master URL for mesos executor")
             raise AirflowException("mesos.master not provided for mesos executor")
 
         master = configuration.get('mesos', 'MASTER')
@@ -239,7 +239,7 @@ class MesosExecutor(BaseExecutor, LoginMixin):
         else:
             framework.checkpoint = False
 
-        self.logger.info(
+        self.log.info(
             'MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s',
             master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint)
         )
@@ -248,10 +248,10 @@ class MesosExecutor(BaseExecutor, LoginMixin):
 
         if configuration.getboolean('mesos', 'AUTHENTICATE'):
             if not configuration.get('mesos', 'DEFAULT_PRINCIPAL'):
-                self.logger.error("Expecting authentication principal in the environment")
+                self.log.error("Expecting authentication principal in the environment")
                 raise AirflowException("mesos.default_principal not provided in authenticated mode")
             if not configuration.get('mesos', 'DEFAULT_SECRET'):
-                self.logger.error("Expecting authentication secret in the environment")
+                self.log.error("Expecting authentication secret in the environment")
                 raise AirflowException("mesos.default_secret not provided in authenticated mode")
 
             credential = mesos_pb2.Credential()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 497fa28..5fc7e22 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -32,7 +32,7 @@ from past.builtins import basestring
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 from airflow.hooks.dbapi_hook import DbApiHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
@@ -499,7 +499,7 @@ class BigQueryBaseCursor(LoggingMixin):
                     "'WRITE_APPEND' or 'WRITE_TRUNCATE'."
                 )
             else:
-                self.logger.info(
+                self.log.info(
                     "Adding experimental "
                     "'schemaUpdateOptions': {0}".format(schema_update_options)
                 )
@@ -576,12 +576,12 @@ class BigQueryBaseCursor(LoggingMixin):
                             )
                         )
                 else:
-                    self.logger.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
+                    self.log.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
                     time.sleep(5)
 
             except HttpError as err:
                 if err.resp.status in [500, 503]:
-                    self.logger.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
+                    self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
                     time.sleep(5)
                 else:
                     raise Exception(
@@ -660,14 +660,14 @@ class BigQueryBaseCursor(LoggingMixin):
                         datasetId=deletion_dataset,
                         tableId=deletion_table) \
                 .execute()
-            self.logger.info('Deleted table %s:%s.%s.',
-                         deletion_project, deletion_dataset, deletion_table)
+            self.log.info('Deleted table %s:%s.%s.',
+                          deletion_project, deletion_dataset, deletion_table)
         except HttpError:
             if not ignore_if_missing:
                 raise Exception(
                     'Table deletion failed. Table does not exist.')
             else:
-                self.logger.info('Table does not exist. Skipping.')
+                self.log.info('Table does not exist. Skipping.')
 
 
     def run_table_upsert(self, dataset_id, table_resource, project_id=None):
@@ -694,7 +694,7 @@ class BigQueryBaseCursor(LoggingMixin):
             for table in tables_list_resp.get('tables', []):
                 if table['tableReference']['tableId'] == table_id:
                     # found the table, do update
-                    self.logger.info(
+                    self.log.info(
                         'Table %s:%s.%s exists, updating.',
                         project_id, dataset_id, table_id
                     )
@@ -712,7 +712,7 @@ class BigQueryBaseCursor(LoggingMixin):
             # If there is no next page, then the table doesn't exist.
             else:
                 # do insert
-                self.logger.info(
+                self.log.info(
                     'Table %s:%s.%s does not exist. creating.',
                     project_id, dataset_id, table_id
                 )
@@ -759,7 +759,7 @@ class BigQueryBaseCursor(LoggingMixin):
                                 'tableId': view_table}}
         # check to see if the view we want to add already exists.
         if view_access not in access:
-            self.logger.info(
+            self.log.info(
                 'Granting table %s:%s.%s authorized view access to %s:%s dataset.',
                 view_project, view_dataset, view_table, source_project, source_dataset
             )
@@ -769,7 +769,7 @@ class BigQueryBaseCursor(LoggingMixin):
                                                  body={'access': access}).execute()
         else:
             # if view is already in access, do nothing.
-            self.logger.info(
+            self.log.info(
                 'Table %s:%s.%s already has authorized view access to %s:%s dataset.',
                 view_project, view_dataset, view_table, source_project, source_dataset
             )
@@ -1032,7 +1032,7 @@ def _split_tablename(table_input, default_project_id, var_name=None):
 
     if project_id is None:
         if var_name is not None:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.info(
                 'Project not included in {var}: {input}; using project "{project}"'.format(
                     var=var_name, input=table_input, project=default_project_id

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/cloudant_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py
index d9db08d..cbb0cca 100644
--- a/airflow/contrib/hooks/cloudant_hook.py
+++ b/airflow/contrib/hooks/cloudant_hook.py
@@ -18,7 +18,7 @@ import cloudant
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class CloudantHook(BaseHook):
@@ -35,7 +35,7 @@ class CloudantHook(BaseHook):
         def _str(s):
             # cloudant-python doesn't support unicode.
             if isinstance(s, unicode):
-                log = LoggingMixin().logger
+                log = LoggingMixin().log
                 log.debug(
                     'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".',
                     s

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/databricks_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/databricks_hook.py b/airflow/contrib/hooks/databricks_hook.py
index 7b20433..cd9dc54 100644
--- a/airflow/contrib/hooks/databricks_hook.py
+++ b/airflow/contrib/hooks/databricks_hook.py
@@ -20,7 +20,7 @@ from airflow.hooks.base_hook import BaseHook
 from requests import exceptions as requests_exceptions
 from requests.auth import AuthBase
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 try:
     from urllib import parse as urlparse
@@ -100,10 +100,10 @@ class DatabricksHook(BaseHook, LoggingMixin):
             host=self._parse_host(self.databricks_conn.host),
             endpoint=endpoint)
         if 'token' in self.databricks_conn.extra_dejson:
-            self.logger.info('Using token auth.')
+            self.log.info('Using token auth.')
             auth = _TokenAuth(self.databricks_conn.extra_dejson['token'])
         else:
-            self.logger.info('Using basic auth.')
+            self.log.info('Using basic auth.')
             auth = (self.databricks_conn.login, self.databricks_conn.password)
         if method == 'GET':
             request_func = requests.get
@@ -129,7 +129,7 @@ class DatabricksHook(BaseHook, LoggingMixin):
                         response.content, response.status_code))
             except (requests_exceptions.ConnectionError,
                     requests_exceptions.Timeout) as e:
-                self.logger.error(
+                self.log.error(
                     'Attempt %s API Request to Databricks failed with reason: %s',
                     attempt_num, e
                 )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/datadog_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datadog_hook.py b/airflow/contrib/hooks/datadog_hook.py
index 0f5af00..6caf611 100644
--- a/airflow/contrib/hooks/datadog_hook.py
+++ b/airflow/contrib/hooks/datadog_hook.py
@@ -17,7 +17,7 @@ from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
 from datadog import initialize, api
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class DatadogHook(BaseHook, LoggingMixin):
@@ -47,7 +47,7 @@ class DatadogHook(BaseHook, LoggingMixin):
         if self.app_key is None:
             raise AirflowException("app_key must be specified in the Datadog connection details")
 
-        self.logger.info("Setting up api keys for Datadog")
+        self.log.info("Setting up api keys for Datadog")
         options = {
             'api_key': self.api_key,
             'app_key': self.app_key
@@ -56,7 +56,7 @@ class DatadogHook(BaseHook, LoggingMixin):
 
     def validate_response(self, response):
         if response['status'] != 'ok':
-            self.logger.error("Datadog returned: %s", response)
+            self.log.error("Datadog returned: %s", response)
             raise AirflowException("Error status received from Datadog")
 
     def send_metric(self, metric_name, datapoint, tags=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/datastore_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datastore_hook.py b/airflow/contrib/hooks/datastore_hook.py
index 2ff1600..cf98dc7 100644
--- a/airflow/contrib/hooks/datastore_hook.py
+++ b/airflow/contrib/hooks/datastore_hook.py
@@ -136,8 +136,8 @@ class DatastoreHook(GoogleCloudBaseHook):
             result = self.get_operation(name)
             state = result['metadata']['common']['state']
             if state == 'PROCESSING':
-                self.logger.info('Operation is processing. Re-polling state in {} seconds'
-                        .format(polling_interval_in_seconds))
+                self.log.info('Operation is processing. Re-polling state in {} seconds'
+                              .format(polling_interval_in_seconds))
                 time.sleep(polling_interval_in_seconds)
             else:
                 return result

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/ftp_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ftp_hook.py b/airflow/contrib/hooks/ftp_hook.py
index a6b3181..b1e224d 100644
--- a/airflow/contrib/hooks/ftp_hook.py
+++ b/airflow/contrib/hooks/ftp_hook.py
@@ -19,7 +19,7 @@ import os.path
 from airflow.hooks.base_hook import BaseHook
 from past.builtins import basestring
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 def mlsd(conn, path="", facts=None):
@@ -167,9 +167,9 @@ class FTPHook(BaseHook, LoggingMixin):
 
         remote_path, remote_file_name = os.path.split(remote_full_path)
         conn.cwd(remote_path)
-        self.logger.info('Retrieving file from FTP: %s', remote_full_path)
+        self.log.info('Retrieving file from FTP: %s', remote_full_path)
         conn.retrbinary('RETR %s' % remote_file_name, output_handle.write)
-        self.logger.info('Finished retrieving file from FTP: %s', remote_full_path)
+        self.log.info('Finished retrieving file from FTP: %s', remote_full_path)
 
         if is_path:
             output_handle.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_api_base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py
index 7476c90..28721d3 100644
--- a/airflow/contrib/hooks/gcp_api_base_hook.py
+++ b/airflow/contrib/hooks/gcp_api_base_hook.py
@@ -18,7 +18,7 @@ from oauth2client.service_account import ServiceAccountCredentials
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class GoogleCloudBaseHook(BaseHook, LoggingMixin):
@@ -66,7 +66,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin):
             kwargs['sub'] = self.delegate_to
 
         if not key_path:
-            self.logger.info('Getting connection using `gcloud auth` user, since no key file '
+            self.log.info('Getting connection using `gcloud auth` user, since no key file '
                          'is defined for hook.')
             credentials = GoogleCredentials.get_application_default()
         else:
@@ -74,7 +74,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin):
                 raise AirflowException('Scope should be defined when using a key file.')
             scopes = [s.strip() for s in scope.split(',')]
             if key_path.endswith('.json'):
-                self.logger.info('Getting connection using a JSON key file.')
+                self.log.info('Getting connection using a JSON key file.')
                 credentials = ServiceAccountCredentials\
                     .from_json_keyfile_name(key_path, scopes)
             elif key_path.endswith('.p12'):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index f5767bd..b1a1e0e 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -19,7 +19,7 @@ import uuid
 from apiclient.discovery import build
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class _DataflowJob(LoggingMixin):
@@ -48,12 +48,12 @@ class _DataflowJob(LoggingMixin):
             job = self._dataflow.projects().jobs().get(projectId=self._project_number,
                                                        jobId=self._job_id).execute()
         if 'currentState' in job:
-            self.logger.info(
+            self.log.info(
                 'Google Cloud DataFlow job %s is %s',
                 job['name'], job['currentState']
             )
         else:
-            self.logger.info(
+            self.log.info(
                 'Google Cloud DataFlow with job_id %s has name %s',
                 self._job_id, job['name']
             )
@@ -75,7 +75,7 @@ class _DataflowJob(LoggingMixin):
                 elif 'JOB_STATE_PENDING' == self._job['currentState']:
                     time.sleep(15)
                 else:
-                    self.logger.debug(str(self._job))
+                    self.log.debug(str(self._job))
                     raise Exception(
                         "Google Cloud Dataflow job {} was unknown state: {}".format(
                             self._job['name'], self._job['currentState']))
@@ -109,15 +109,15 @@ class _Dataflow(LoggingMixin):
 
     def wait_for_done(self):
         reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
-        self.logger.info("Start waiting for DataFlow process to complete.")
+        self.log.info("Start waiting for DataFlow process to complete.")
         while self._proc.poll() is None:
             ret = select.select(reads, [], [], 5)
             if ret is not None:
                 for fd in ret[0]:
                     line = self._line(fd)
-                    self.logger.debug(line[:-1])
+                    self.log.debug(line[:-1])
             else:
-                self.logger.info("Waiting for DataFlow process to complete.")
+                self.log.info("Waiting for DataFlow process to complete.")
         if self._proc.returncode is not 0:
             raise Exception("DataFlow failed with return code {}".format(
                 self._proc.returncode))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 3a1336e..c964f4c 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -18,7 +18,7 @@ import uuid
 from apiclient.discovery import build
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class _DataProcJob(LoggingMixin):
@@ -30,7 +30,7 @@ class _DataProcJob(LoggingMixin):
             region='global',
             body=job).execute()
         self.job_id = self.job['reference']['jobId']
-        self.logger.info(
+        self.log.info(
             'DataProc job %s is %s',
             self.job_id, str(self.job['status']['state'])
         )
@@ -43,20 +43,20 @@ class _DataProcJob(LoggingMixin):
                 jobId=self.job_id).execute()
             if 'ERROR' == self.job['status']['state']:
                 print(str(self.job))
-                self.logger.error('DataProc job %s has errors', self.job_id)
-                self.logger.error(self.job['status']['details'])
-                self.logger.debug(str(self.job))
+                self.log.error('DataProc job %s has errors', self.job_id)
+                self.log.error(self.job['status']['details'])
+                self.log.debug(str(self.job))
                 return False
             if 'CANCELLED' == self.job['status']['state']:
                 print(str(self.job))
-                self.logger.warning('DataProc job %s is cancelled', self.job_id)
+                self.log.warning('DataProc job %s is cancelled', self.job_id)
                 if 'details' in self.job['status']:
-                    self.logger.warning(self.job['status']['details'])
-                self.logger.debug(str(self.job))
+                    self.log.warning(self.job['status']['details'])
+                self.log.debug(str(self.job))
                 return False
             if 'DONE' == self.job['status']['state']:
                 return True
-            self.logger.debug(
+            self.log.debug(
                 'DataProc job %s is %s',
                 self.job_id, str(self.job['status']['state'])
             )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_mlengine_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_mlengine_hook.py b/airflow/contrib/hooks/gcp_mlengine_hook.py
index 35f31a7..c17b614 100644
--- a/airflow/contrib/hooks/gcp_mlengine_hook.py
+++ b/airflow/contrib/hooks/gcp_mlengine_hook.py
@@ -20,11 +20,11 @@ from apiclient.discovery import build
 from oauth2client.client import GoogleCredentials
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 def _poll_with_exponential_delay(request, max_n, is_done_func, is_error_func):
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
 
     for i in range(0, max_n):
         try:
@@ -103,18 +103,18 @@ class MLEngineHook(GoogleCloudBaseHook):
                 if use_existing_job_fn is not None:
                     existing_job = self._get_job(project_id, job_id)
                     if not use_existing_job_fn(existing_job):
-                        self.logger.error(
+                        self.log.error(
                             'Job with job_id %s already exist, but it does '
                             'not match our expectation: %s',
                             job_id, existing_job
                         )
                         raise
-                self.logger.info(
+                self.log.info(
                     'Job with job_id %s already exist. Will waiting for it to finish',
                     job_id
                 )
             else:
-                self.logger.error('Failed to create MLEngine job: {}'.format(e))
+                self.log.error('Failed to create MLEngine job: {}'.format(e))
                 raise
 
         return self._wait_for_job_done(project_id, job_id)
@@ -139,7 +139,7 @@ class MLEngineHook(GoogleCloudBaseHook):
                     # polling after 30 seconds when quota failure occurs
                     time.sleep(30)
                 else:
-                    self.logger.error('Failed to get MLEngine job: {}'.format(e))
+                    self.log.error('Failed to get MLEngine job: {}'.format(e))
                     raise
 
     def _wait_for_job_done(self, project_id, job_id, interval=30):
@@ -191,10 +191,10 @@ class MLEngineHook(GoogleCloudBaseHook):
 
         try:
             response = request.execute()
-            self.logger.info('Successfully set version: %s to default', response)
+            self.log.info('Successfully set version: %s to default', response)
             return response
         except errors.HttpError as e:
-            self.logger.error('Something went wrong: %s', e)
+            self.log.error('Something went wrong: %s', e)
             raise
 
     def list_versions(self, project_id, model_name):
@@ -262,6 +262,6 @@ class MLEngineHook(GoogleCloudBaseHook):
             return request.execute()
         except errors.HttpError as e:
             if e.resp.status == 404:
-                self.logger.error('Model was not found: %s', e)
+                self.log.error('Model was not found: %s', e)
                 return None
             raise

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index eb17c3b..24c247e 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -182,7 +182,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
                     ts = ts.replace(tzinfo=dateutil.tz.tzutc())
 
                 updated = dateutil.parser.parse(response['updated'])
-                self.logger.info("Verify object date: %s > %s", updated, ts)
+                self.log.info("Verify object date: %s > %s", updated, ts)
 
                 if updated > ts:
                     return True
@@ -247,7 +247,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
             ).execute()
 
             if 'items' not in response:
-                self.logger.info("No items found for prefix: %s", prefix)
+                self.log.info("No items found for prefix: %s", prefix)
                 break
 
             for item in response['items']:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/jira_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py
index 8702608..21e669f 100644
--- a/airflow/contrib/hooks/jira_hook.py
+++ b/airflow/contrib/hooks/jira_hook.py
@@ -16,7 +16,7 @@ from jira.exceptions import JIRAError
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class JiraHook(BaseHook, LoggingMixin):
@@ -35,7 +35,7 @@ class JiraHook(BaseHook, LoggingMixin):
 
     def get_conn(self):
         if not self.client:
-            self.logger.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
+            self.log.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
 
             get_server_info = True
             validate = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 1a5e7ec..833c1c7 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -21,7 +21,7 @@ import six
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 from qds_sdk.qubole import Qubole
@@ -86,7 +86,7 @@ class QuboleHook(BaseHook, LoggingMixin):
         if cmd_id is not None:
             cmd = Command.find(cmd_id)
             if cmd is not None:
-                log = LoggingMixin().logger
+                log = LoggingMixin().log
                 if cmd.status == 'done':
                     log.info('Command ID: %s has been succeeded, hence marking this '
                                 'TI as Success.', cmd_id)
@@ -99,7 +99,7 @@ class QuboleHook(BaseHook, LoggingMixin):
         args = self.cls.parse(self.create_cmd_args(context))
         self.cmd = self.cls.create(**args)
         context['task_instance'].xcom_push(key='qbol_cmd_id', value=self.cmd.id)
-        self.logger.info(
+        self.log.info(
             "Qubole command created with Id: %s and Status: %s",
             self.cmd.id, self.cmd.status
         )
@@ -107,10 +107,10 @@ class QuboleHook(BaseHook, LoggingMixin):
         while not Command.is_done(self.cmd.status):
             time.sleep(Qubole.poll_interval)
             self.cmd = self.cls.find(self.cmd.id)
-            self.logger.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
+            self.log.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
 
         if 'fetch_logs' in self.kwargs and self.kwargs['fetch_logs'] is True:
-            self.logger.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
+            self.log.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
 
         if self.cmd.status != 'done':
             raise AirflowException('Command Id: {0} failed with Status: {1}'.format(
@@ -126,7 +126,7 @@ class QuboleHook(BaseHook, LoggingMixin):
             cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=ti.task_id)
             self.cmd = self.cls.find(cmd_id)
         if self.cls and self.cmd:
-            self.logger.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
+            self.log.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
             self.cmd.cancel()
 
     def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/redis_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py
index a8999d6..278e196 100644
--- a/airflow/contrib/hooks/redis_hook.py
+++ b/airflow/contrib/hooks/redis_hook.py
@@ -19,7 +19,7 @@ from redis import StrictRedis
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class RedisHook(BaseHook, LoggingMixin):
@@ -41,7 +41,7 @@ class RedisHook(BaseHook, LoggingMixin):
         self.password = conn.password
         self.db = int(conn.extra_dejson.get('db', 0))
 
-        self.logger.debug(
+        self.log.debug(
             '''Connection "{conn}":
             \thost: {host}
             \tport: {port}
@@ -59,7 +59,7 @@ class RedisHook(BaseHook, LoggingMixin):
         Returns a Redis connection.
         """
         if not self.client:
-            self.logger.debug(
+            self.log.debug(
                 'generating Redis client for conn_id "%s" on %s:%s:%s',
                 self.redis_conn_id, self.host, self.port, self.db
             )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/salesforce_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py
index f2b5fef..0d0a104 100644
--- a/airflow/contrib/hooks/salesforce_hook.py
+++ b/airflow/contrib/hooks/salesforce_hook.py
@@ -29,7 +29,7 @@ import json
 import pandas as pd
 import time
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SalesforceHook(BaseHook, LoggingMixin):
@@ -92,10 +92,10 @@ class SalesforceHook(BaseHook, LoggingMixin):
         """
         self.sign_in()
 
-        self.logger.info("Querying for all objects")
+        self.log.info("Querying for all objects")
         query = self.sf.query_all(query)
 
-        self.logger.info(
+        self.log.info(
             "Received results: Total size: %s; Done: %s",
             query['totalSize'], query['done']
         )
@@ -144,7 +144,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
         field_string = self._build_field_list(fields)
 
         query = "SELECT {0} FROM {1}".format(field_string, obj)
-        self.logger.info(
+        self.log.info(
             "Making query to Salesforce: %s",
             query if len(query) < 30 else " ... ".join([query[:15], query[-15:]])
         )
@@ -169,7 +169,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
         try:
             col = pd.to_datetime(col)
         except ValueError:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.warning(
                 "Could not convert field to timestamps: %s", col.name
             )
@@ -265,7 +265,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
             # for each returned record
             object_name = query_results[0]['attributes']['type']
 
-            self.logger.info("Coercing timestamps for: %s", object_name)
+            self.log.info("Coercing timestamps for: %s", object_name)
 
             schema = self.describe_object(object_name)
 
@@ -299,7 +299,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
             # there are also a ton of newline objects
             # that mess up our ability to write to csv
             # we remove these newlines so that the output is a valid CSV format
-            self.logger.info("Cleaning data and writing to CSV")
+            self.log.info("Cleaning data and writing to CSV")
             possible_strings = df.columns[df.dtypes == "object"]
             df[possible_strings] = df[possible_strings].apply(
                 lambda x: x.str.replace("\r\n", "")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index aa16130..6973023 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -16,7 +16,7 @@ import subprocess
 
 from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SparkSqlHook(BaseHook, LoggingMixin):
@@ -121,7 +121,7 @@ class SparkSqlHook(BaseHook, LoggingMixin):
             connection_cmd += ["--queue", self._yarn_queue]
 
         connection_cmd += cmd
-        self.logger.debug("Spark-Sql cmd: %s", connection_cmd)
+        self.log.debug("Spark-Sql cmd: %s", connection_cmd)
 
         return connection_cmd
 
@@ -151,5 +151,5 @@ class SparkSqlHook(BaseHook, LoggingMixin):
 
     def kill(self):
         if self._sp and self._sp.poll() is None:
-            self.logger.info("Killing the Spark-Sql job")
+            self.log.info("Killing the Spark-Sql job")
             self._sp.kill()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index bdd1efe..7d59cd2 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -18,7 +18,7 @@ import re
 
 from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SparkSubmitHook(BaseHook, LoggingMixin):
@@ -123,7 +123,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             conn_data['spark_home'] = extra.get('spark-home', None)
             conn_data['spark_binary'] = extra.get('spark-binary', 'spark-submit')
         except AirflowException:
-            self.logger.debug(
+            self.log.debug(
                 "Could not load connection string %s, defaulting to %s",
                 self._conn_id, conn_data['master']
             )
@@ -192,7 +192,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         if self._application_args:
             connection_cmd += self._application_args
 
-        self.logger.debug("Spark-Submit cmd: %s", connection_cmd)
+        self.log.debug("Spark-Submit cmd: %s", connection_cmd)
 
         return connection_cmd
 
@@ -239,15 +239,15 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
                     self._yarn_application_id = match.groups()[0]
 
             # Pass to logging
-            self.logger.info(line)
+            self.log.info(line)
 
     def on_kill(self):
         if self._sp and self._sp.poll() is None:
-            self.logger.info('Sending kill signal to %s', self._connection['spark_binary'])
+            self.log.info('Sending kill signal to %s', self._connection['spark_binary'])
             self._sp.kill()
 
             if self._yarn_application_id:
-                self.logger.info('Killing application on YARN')
+                self.log.info('Killing application on YARN')
                 kill_cmd = "yarn application -kill {0}".format(self._yarn_application_id).split()
                 yarn_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-                self.logger.info("YARN killed with return code: %s", yarn_kill.wait())
+                self.log.info("YARN killed with return code: %s", yarn_kill.wait())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index 0584df4..5b00b15 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -20,7 +20,7 @@ import subprocess
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SqoopHook(BaseHook, LoggingMixin):
@@ -76,7 +76,7 @@ class SqoopHook(BaseHook, LoggingMixin):
             password_index = cmd.index('--password')
             cmd[password_index + 1] = 'MASKED'
         except ValueError:
-            self.logger.debug("No password in sqoop cmd")
+            self.log.debug("No password in sqoop cmd")
         return cmd
 
     def Popen(self, cmd, **kwargs):
@@ -87,18 +87,18 @@ class SqoopHook(BaseHook, LoggingMixin):
         :param kwargs: extra arguments to Popen (see subprocess.Popen)
         :return: handle to subprocess
         """
-        self.logger.info("Executing command: %s", ' '.join(cmd))
+        self.log.info("Executing command: %s", ' '.join(cmd))
         sp = subprocess.Popen(cmd,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.STDOUT,
                               **kwargs)
 
         for line in iter(sp.stdout):
-            self.logger.info(line.strip())
+            self.log.info(line.strip())
 
         sp.wait()
 
-        self.logger.info("Command exited with return code %s", sp.returncode)
+        self.log.info("Command exited with return code %s", sp.returncode)
 
         if sp.returncode:
             raise AirflowException("Sqoop command failed: %s", ' '.join(cmd))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/ssh_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py
index 3fe9146..b061fd7 100755
--- a/airflow/contrib/hooks/ssh_hook.py
+++ b/airflow/contrib/hooks/ssh_hook.py
@@ -23,7 +23,7 @@ import paramiko
 from contextlib import contextmanager
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SSHHook(BaseHook, LoggingMixin):
@@ -70,7 +70,7 @@ class SSHHook(BaseHook, LoggingMixin):
 
     def get_conn(self):
         if not self.client:
-            self.logger.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
+            self.log.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
             if self.ssh_conn_id is not None:
                 conn = self.get_connection(self.ssh_conn_id)
                 if self.username is None:
@@ -98,7 +98,7 @@ class SSHHook(BaseHook, LoggingMixin):
 
             # Auto detecting username values from system
             if not self.username:
-                self.logger.debug(
+                self.log.debug(
                     "username to ssh to host: %s is not specified for connection id"
                     " %s. Using system's default provided by getpass.getuser()",
                     self.remote_host, self.ssh_conn_id
@@ -142,17 +142,17 @@ class SSHHook(BaseHook, LoggingMixin):
 
                 self.client = client
             except paramiko.AuthenticationException as auth_error:
-                self.logger.error(
+                self.log.error(
                     "Auth failed while connecting to host: %s, error: %s",
                     self.remote_host, auth_error
                 )
             except paramiko.SSHException as ssh_error:
-                self.logger.error(
+                self.log.error(
                     "Failed connecting to host: %s, error: %s",
                     self.remote_host, ssh_error
                 )
             except Exception as error:
-                self.logger.error(
+                self.log.error(
                     "Error connecting to host: %s, error: %s",
                     self.remote_host, error
                 )
@@ -191,7 +191,7 @@ class SSHHook(BaseHook, LoggingMixin):
                           ]
 
         ssh_cmd += ssh_tunnel_cmd
-        self.logger.debug("Creating tunnel with cmd: %s", ssh_cmd)
+        self.log.debug("Creating tunnel with cmd: %s", ssh_cmd)
 
         proc = subprocess.Popen(ssh_cmd,
                                 stdin=subprocess.PIPE,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 37e4a97..a2ba824 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -87,7 +87,7 @@ class BigQueryOperator(BaseOperator):
         self.query_params = query_params
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.bql)
+        self.log.info('Executing: %s', self.bql)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                             delegate_to=self.delegate_to)
         conn = hook.get_conn()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_table_delete_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py
index 21de7cc..0f4ef50 100644
--- a/airflow/contrib/operators/bigquery_table_delete_operator.py
+++ b/airflow/contrib/operators/bigquery_table_delete_operator.py
@@ -53,7 +53,7 @@ class BigQueryTableDeleteOperator(BaseOperator):
         self.ignore_if_missing = ignore_if_missing
 
     def execute(self, context):
-        self.logger.info('Deleting: %s', self.deletion_dataset_table)
+        self.log.info('Deleting: %s', self.deletion_dataset_table)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                             delegate_to=self.delegate_to)
         conn = hook.get_conn()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_to_bigquery.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py
index 8e21270..2bc4a8b 100644
--- a/airflow/contrib/operators/bigquery_to_bigquery.py
+++ b/airflow/contrib/operators/bigquery_to_bigquery.py
@@ -68,7 +68,7 @@ class BigQueryToBigQueryOperator(BaseOperator):
         self.delegate_to = delegate_to
 
     def execute(self, context):
-        self.logger.info(
+        self.log.info(
             'Executing copy of %s into: %s',
             self.source_project_dataset_tables, self.destination_project_dataset_table
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py
index 23a2029..800e7bd 100644
--- a/airflow/contrib/operators/bigquery_to_gcs.py
+++ b/airflow/contrib/operators/bigquery_to_gcs.py
@@ -79,9 +79,9 @@ class BigQueryToCloudStorageOperator(BaseOperator):
         self.delegate_to = delegate_to
 
     def execute(self, context):
-        self.logger.info('Executing extract of %s into: %s',
-                     self.source_project_dataset_table,
-                     self.destination_cloud_storage_uris)
+        self.log.info('Executing extract of %s into: %s',
+                      self.source_project_dataset_table,
+                      self.destination_cloud_storage_uris)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                             delegate_to=self.delegate_to)
         conn = hook.get_conn()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/databricks_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py
index 8773357..cffc4ff 100644
--- a/airflow/contrib/operators/databricks_operator.py
+++ b/airflow/contrib/operators/databricks_operator.py
@@ -214,7 +214,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
             raise AirflowException(msg)
 
     def _log_run_page_url(self, url):
-        self.logger.info('View run status, Spark UI, and logs at %s', url)
+        self.log.info('View run status, Spark UI, and logs at %s', url)
 
     def get_hook(self):
         return DatabricksHook(
@@ -225,13 +225,13 @@ class DatabricksSubmitRunOperator(BaseOperator):
         hook = self.get_hook()
         self.run_id = hook.submit_run(self.json)
         run_page_url = hook.get_run_page_url(self.run_id)
-        self.logger.info('Run submitted with run_id: %s', self.run_id)
+        self.log.info('Run submitted with run_id: %s', self.run_id)
         self._log_run_page_url(run_page_url)
         while True:
             run_state = hook.get_run_state(self.run_id)
             if run_state.is_terminal:
                 if run_state.is_successful:
-                    self.logger.info('%s completed successfully.', self.task_id)
+                    self.log.info('%s completed successfully.', self.task_id)
                     self._log_run_page_url(run_page_url)
                     return
                 else:
@@ -240,15 +240,15 @@ class DatabricksSubmitRunOperator(BaseOperator):
                         s=run_state)
                     raise AirflowException(error_message)
             else:
-                self.logger.info('%s in run state: %s', self.task_id, run_state)
+                self.log.info('%s in run state: %s', self.task_id, run_state)
                 self._log_run_page_url(run_page_url)
-                self.logger.info('Sleeping for %s seconds.', self.polling_period_seconds)
+                self.log.info('Sleeping for %s seconds.', self.polling_period_seconds)
                 time.sleep(self.polling_period_seconds)
 
     def on_kill(self):
         hook = self.get_hook()
         hook.cancel_run(self.run_id)
-        self.logger.info(
+        self.log.info(
             'Task: %s with run_id: %s was requested to be cancelled.',
             self.task_id, self.run_id
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 3c22b60..bdb0335 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -177,12 +177,12 @@ class DataprocClusterCreateOperator(BaseOperator):
         while True:
             state = self._get_cluster_state(service)
             if state is None:
-                self.logger.info("No state for cluster '%s'", self.cluster_name)
+                self.log.info("No state for cluster '%s'", self.cluster_name)
                 time.sleep(15)
             else:
-                self.logger.info("State for cluster '%s' is %s", self.cluster_name, state)
+                self.log.info("State for cluster '%s' is %s", self.cluster_name, state)
                 if self._cluster_ready(state, service):
-                    self.logger.info(
+                    self.log.info(
                         "Cluster '%s' successfully created", self.cluster_name
                     )
                     return
@@ -264,7 +264,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         return cluster_data
 
     def execute(self, context):
-        self.logger.info('Creating cluster: %s', self.cluster_name)
+        self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataProcHook(
             gcp_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to
@@ -272,7 +272,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         service = hook.get_conn()
 
         if self._get_cluster(service):
-            self.logger.info(
+            self.log.info(
                 'Cluster %s already exists... Checking status...',
                 self.cluster_name
             )
@@ -290,7 +290,7 @@ class DataprocClusterCreateOperator(BaseOperator):
             # probably two cluster start commands at the same time
             time.sleep(10)
             if self._get_cluster(service):
-                self.logger.info(
+                self.log.info(
                     'Cluster {} already exists... Checking status...',
                     self.cluster_name
                  )
@@ -358,7 +358,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
             time.sleep(15)
 
     def execute(self, context):
-        self.logger.info('Deleting cluster: %s', self.cluster_name)
+        self.log.info('Deleting cluster: %s', self.cluster_name)
         hook = DataProcHook(
             gcp_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to
@@ -371,7 +371,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
             clusterName=self.cluster_name
         ).execute()
         operation_name = response['name']
-        self.logger.info("Cluster delete operation name: %s", operation_name)
+        self.log.info("Cluster delete operation name: %s", operation_name)
         self._wait_for_done(service, operation_name)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/datastore_export_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py
index 76415e1..51e1d06 100644
--- a/airflow/contrib/operators/datastore_export_operator.py
+++ b/airflow/contrib/operators/datastore_export_operator.py
@@ -78,7 +78,7 @@ class DatastoreExportOperator(BaseOperator):
         self.xcom_push = xcom_push
 
     def execute(self, context):
-        self.logger.info('Exporting data to Cloud Storage bucket ' + self.bucket)
+        self.log.info('Exporting data to Cloud Storage bucket ' + self.bucket)
 
         if self.overwrite_existing and self.namespace:
             gcs_hook = GoogleCloudStorageHook(self.cloud_storage_conn_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/datastore_import_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py
index 74bd940..d8c42e7 100644
--- a/airflow/contrib/operators/datastore_import_operator.py
+++ b/airflow/contrib/operators/datastore_import_operator.py
@@ -72,7 +72,7 @@ class DatastoreImportOperator(BaseOperator):
         self.xcom_push = xcom_push
 
     def execute(self, context):
-        self.logger.info('Importing data from Cloud Storage bucket %s', self.bucket)
+        self.log.info('Importing data from Cloud Storage bucket %s', self.bucket)
         ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to)
         result = ds_hook.import_from_storage_bucket(bucket=self.bucket,
                                                     file=self.file,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/ecs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py
index 0c75eaa..898a77a 100644
--- a/airflow/contrib/operators/ecs_operator.py
+++ b/airflow/contrib/operators/ecs_operator.py
@@ -56,11 +56,11 @@ class ECSOperator(BaseOperator):
         self.hook = self.get_hook()
 
     def execute(self, context):
-        self.logger.info(
+        self.log.info(
             'Running ECS Task - Task definition: %s - on cluster %s',
             self.task_definition,self.cluster
         )
-        self.logger.info('ECSOperator overrides: %s', self.overrides)
+        self.log.info('ECSOperator overrides: %s', self.overrides)
 
         self.client = self.hook.get_client_type(
             'ecs',
@@ -77,13 +77,13 @@ class ECSOperator(BaseOperator):
         failures = response['failures']
         if len(failures) > 0:
             raise AirflowException(response)
-        self.logger.info('ECS Task started: %s', response)
+        self.log.info('ECS Task started: %s', response)
 
         self.arn = response['tasks'][0]['taskArn']
         self._wait_for_task_ended()
 
         self._check_success_task()
-        self.logger.info('ECS Task has been successfully executed: %s', response)
+        self.log.info('ECS Task has been successfully executed: %s', response)
 
     def _wait_for_task_ended(self):
         waiter = self.client.get_waiter('tasks_stopped')
@@ -98,7 +98,7 @@ class ECSOperator(BaseOperator):
             cluster=self.cluster,
             tasks=[self.arn]
         )
-        self.logger.info('ECS Task stopped, check status: %s', response)
+        self.log.info('ECS Task stopped, check status: %s', response)
 
         if len(response.get('failures', [])) > 0:
             raise AirflowException(response)
@@ -124,4 +124,4 @@ class ECSOperator(BaseOperator):
             cluster=self.cluster,
             task=self.arn,
             reason='Task killed by the user')
-        self.logger.info(response)
+        self.log.info(response)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py
index dbf764e..227474e 100644
--- a/airflow/contrib/operators/emr_add_steps_operator.py
+++ b/airflow/contrib/operators/emr_add_steps_operator.py
@@ -48,11 +48,11 @@ class EmrAddStepsOperator(BaseOperator):
     def execute(self, context):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        self.logger.info('Adding steps to %s', self.job_flow_id)
+        self.log.info('Adding steps to %s', self.job_flow_id)
         response = emr.add_job_flow_steps(JobFlowId=self.job_flow_id, Steps=self.steps)
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
             raise AirflowException('Adding steps failed: %s' % response)
         else:
-            self.logger.info('Steps %s added to JobFlow', response['StepIds'])
+            self.log.info('Steps %s added to JobFlow', response['StepIds'])
             return response['StepIds']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py
index 4e40b17..2544adf 100644
--- a/airflow/contrib/operators/emr_create_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_create_job_flow_operator.py
@@ -50,7 +50,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
     def execute(self, context):
         emr = EmrHook(aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id)
 
-        self.logger.info(
+        self.log.info(
             'Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s',
             self.aws_conn_id, self.emr_conn_id
         )
@@ -59,5 +59,5 @@ class EmrCreateJobFlowOperator(BaseOperator):
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
             raise AirflowException('JobFlow creation failed: %s' % response)
         else:
-            self.logger.info('JobFlow with id %s created', response['JobFlowId'])
+            self.log.info('JobFlow with id %s created', response['JobFlowId'])
             return response['JobFlowId']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
index df641ad..ec29897 100644
--- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
@@ -43,10 +43,10 @@ class EmrTerminateJobFlowOperator(BaseOperator):
     def execute(self, context):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        self.logger.info('Terminating JobFlow %s', self.job_flow_id)
+        self.log.info('Terminating JobFlow %s', self.job_flow_id)
         response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
             raise AirflowException('JobFlow termination failed: %s' % response)
         else:
-            self.logger.info('JobFlow with id %s terminated', self.job_flow_id)
+            self.log.info('JobFlow with id %s terminated', self.job_flow_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/file_to_wasb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py
index 4519e1e..3478dd3 100644
--- a/airflow/contrib/operators/file_to_wasb.py
+++ b/airflow/contrib/operators/file_to_wasb.py
@@ -51,7 +51,7 @@ class FileToWasbOperator(BaseOperator):
     def execute(self, context):
         """Upload a file to Azure Blob Storage."""
         hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
-        self.logger.info(
+        self.log.info(
             'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals())
         )
         hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py
index ca7d716..e7640c8 100644
--- a/airflow/contrib/operators/fs_operator.py
+++ b/airflow/contrib/operators/fs_operator.py
@@ -48,7 +48,7 @@ class FileSensor(BaseSensorOperator):
         hook = FSHook(self.fs_conn_id)
         basepath = hook.get_path()
         full_path = "/".join([basepath, self.filepath])
-        self.logger.info('Poking for file {full_path}'.format(**locals()))
+        self.log.info('Poking for file {full_path}'.format(**locals()))
         try:
             files = [f for f in walk(full_path)]
         except:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/gcs_download_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py
index 27e85b7..53516b1 100644
--- a/airflow/contrib/operators/gcs_download_operator.py
+++ b/airflow/contrib/operators/gcs_download_operator.py
@@ -65,7 +65,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
         self.delegate_to = delegate_to
 
     def execute(self, context):
-        self.logger.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
+        self.log.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
         hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
                                       delegate_to=self.delegate_to)
         file_bytes = hook.download(self.bucket, self.object, self.filename)
@@ -74,4 +74,4 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
                 context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
             else:
                 raise RuntimeError('The size of the downloaded file is too large to push to XCom!')
-        self.logger.info(file_bytes)
+        self.log.info(file_bytes)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 01f53cc..730a3bc 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -189,7 +189,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
                 self.destination_project_dataset_table))
             row = cursor.fetchone()
             max_id = row[0] if row[0] else 0
-            self.logger.info(
+            self.log.info(
                 'Loaded BQ data with max %s.%s=%s',
                 self.destination_project_dataset_table, self.max_id_key, max_id
             )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/hipchat_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py
index 19c6d76..d82ad61 100644
--- a/airflow/contrib/operators/hipchat_operator.py
+++ b/airflow/contrib/operators/hipchat_operator.py
@@ -66,8 +66,8 @@ class HipChatAPIOperator(BaseOperator):
                                         'Authorization': 'Bearer %s' % self.token},
                                     data=self.body)
         if response.status_code >= 400:
-            self.logger.error('HipChat API call failed: %s %s',
-                          response.status_code, response.reason)
+            self.log.error('HipChat API call failed: %s %s',
+                           response.status_code, response.reason)
             raise AirflowException('HipChat API call failed: %s %s' %
                                    (response.status_code, response.reason))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/mlengine_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py
index fdbfede..4d8943b 100644
--- a/airflow/contrib/operators/mlengine_operator.py
+++ b/airflow/contrib/operators/mlengine_operator.py
@@ -22,9 +22,9 @@ from airflow.operators import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from apiclient import errors
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def _create_prediction_input(project_id,
@@ -225,7 +225,7 @@ class MLEngineBatchPredictionOperator(BaseOperator):
                 model_name, version_name, uri, max_worker_count,
                 runtime_version)
         except ValueError as e:
-            self.logger.error(
+            self.log.error(
                 'Cannot create batch prediction job request due to: %s',
                 e
             )
@@ -251,7 +251,7 @@ class MLEngineBatchPredictionOperator(BaseOperator):
             raise
 
         if finished_prediction_job['state'] != 'SUCCEEDED':
-            self.logger.error(
+            self.log.error(
                 'Batch prediction job failed: %s',
                 str(finished_prediction_job))
             raise RuntimeError(finished_prediction_job['errorMessage'])
@@ -538,8 +538,8 @@ class MLEngineTrainingOperator(BaseOperator):
         }
 
         if self._mode == 'DRY_RUN':
-            self.logger.info('In dry_run mode.')
-            self.logger.info('MLEngine Training job request is: {}'.format(training_request))
+            self.log.info('In dry_run mode.')
+            self.log.info('MLEngine Training job request is: {}'.format(training_request))
             return
 
         hook = MLEngineHook(
@@ -557,6 +557,6 @@ class MLEngineTrainingOperator(BaseOperator):
             raise
 
         if finished_training_job['state'] != 'SUCCEEDED':
-            self.logger.error('MLEngine training job failed: {}'.format(
+            self.log.error('MLEngine training job failed: {}'.format(
                 str(finished_training_job)))
             raise RuntimeError(finished_training_job['errorMessage'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index f7b3a5a..c8ebcd0 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -168,7 +168,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
                 'mode': field_mode,
             })
 
-        self.logger.info('Using schema for %s: %s', self.schema_filename, schema)
+        self.log.info('Using schema for %s: %s', self.schema_filename, schema)
         tmp_schema_file_handle = NamedTemporaryFile(mode='w', delete=True)
         json.dump(schema, tmp_schema_file_handle)
         return {self.schema_filename: tmp_schema_file_handle}



[2/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to log

Posted by bo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f690fb4..28dcc04 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -77,7 +77,7 @@ from airflow.utils.operator_resources import Resources
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 from airflow.utils.trigger_rule import TriggerRule
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 Base = declarative_base()
 ID_LEN = 250
@@ -184,7 +184,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         if executor is None:
             executor = GetDefaultExecutor()
         dag_folder = dag_folder or settings.DAGS_FOLDER
-        self.logger.info("Filling up the DagBag from %s", dag_folder)
+        self.log.info("Filling up the DagBag from %s", dag_folder)
         self.dag_folder = dag_folder
         self.dags = {}
         # the file's last modified timestamp when we last read it
@@ -257,7 +257,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                 return found_dags
 
         except Exception as e:
-            self.logger.exception(e)
+            self.log.exception(e)
             return found_dags
 
         mods = []
@@ -269,7 +269,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                         self.file_last_changed[filepath] = file_last_changed_on_disk
                         return found_dags
 
-            self.logger.debug("Importing %s", filepath)
+            self.log.debug("Importing %s", filepath)
             org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
             mod_name = ('unusual_prefix_' +
                         hashlib.sha1(filepath.encode('utf-8')).hexdigest() +
@@ -283,7 +283,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                     m = imp.load_source(mod_name, filepath)
                     mods.append(m)
                 except Exception as e:
-                    self.logger.exception("Failed to import: %s", filepath)
+                    self.log.exception("Failed to import: %s", filepath)
                     self.import_errors[filepath] = str(e)
                     self.file_last_changed[filepath] = file_last_changed_on_disk
 
@@ -294,10 +294,10 @@ class DagBag(BaseDagBag, LoggingMixin):
                 mod_name, ext = os.path.splitext(mod.filename)
                 if not head and (ext == '.py' or ext == '.pyc'):
                     if mod_name == '__init__':
-                        self.logger.warning("Found __init__.%s at root of %s", ext, filepath)
+                        self.log.warning("Found __init__.%s at root of %s", ext, filepath)
                     if safe_mode:
                         with zip_file.open(mod.filename) as zf:
-                            self.logger.debug("Reading %s from %s", mod.filename, filepath)
+                            self.log.debug("Reading %s from %s", mod.filename, filepath)
                             content = zf.read()
                             if not all([s in content for s in (b'DAG', b'airflow')]):
                                 self.file_last_changed[filepath] = (
@@ -313,7 +313,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                         m = importlib.import_module(mod_name)
                         mods.append(m)
                     except Exception as e:
-                        self.logger.exception("Failed to import: %s", filepath)
+                        self.log.exception("Failed to import: %s", filepath)
                         self.import_errors[filepath] = str(e)
                         self.file_last_changed[filepath] = file_last_changed_on_disk
 
@@ -336,11 +336,11 @@ class DagBag(BaseDagBag, LoggingMixin):
         Fails tasks that haven't had a heartbeat in too long
         """
         from airflow.jobs import LocalTaskJob as LJ
-        self.logger.info("Finding 'running' jobs without a recent heartbeat")
+        self.log.info("Finding 'running' jobs without a recent heartbeat")
         TI = TaskInstance
         secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold')
         limit_dttm = datetime.now() - timedelta(seconds=secs)
-        self.logger.info("Failing jobs without heartbeat after %s", limit_dttm)
+        self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
 
         tis = (
             session.query(TI)
@@ -361,7 +361,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                     task = dag.get_task(ti.task_id)
                     ti.task = task
                     ti.handle_failure("{} killed as zombie".format(str(ti)))
-                    self.logger.info('Marked zombie job %s as failed', ti)
+                    self.log.info('Marked zombie job %s as failed', ti)
                     Stats.incr('zombies_killed')
         session.commit()
 
@@ -381,7 +381,7 @@ class DagBag(BaseDagBag, LoggingMixin):
             subdag.parent_dag = dag
             subdag.is_subdag = True
             self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
-        self.logger.debug('Loaded DAG {dag}'.format(**locals()))
+        self.log.debug('Loaded DAG {dag}'.format(**locals()))
 
     def collect_dags(
             self,
@@ -439,7 +439,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                                 str([dag.dag_id for dag in found_dags]),
                             ))
                     except Exception as e:
-                        self.logger.warning(e)
+                        self.log.warning(e)
         Stats.gauge(
             'collect_dags', (datetime.now() - start_dttm).total_seconds(), 1)
         Stats.gauge(
@@ -606,7 +606,7 @@ class Connection(Base, LoggingMixin):
                 self._password = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_encrypted = True
             except AirflowException:
-                self.logger.exception("Failed to load fernet while encrypting value, "
+                self.log.exception("Failed to load fernet while encrypting value, "
                                     "using non-encrypted value.")
                 self._password = value
                 self.is_encrypted = False
@@ -635,7 +635,7 @@ class Connection(Base, LoggingMixin):
                 self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_extra_encrypted = True
             except AirflowException:
-                self.logger.exception("Failed to load fernet while encrypting value, "
+                self.log.exception("Failed to load fernet while encrypting value, "
                                     "using non-encrypted value.")
                 self._extra = value
                 self.is_extra_encrypted = False
@@ -706,8 +706,8 @@ class Connection(Base, LoggingMixin):
             try:
                 obj = json.loads(self.extra)
             except Exception as e:
-                self.logger.exception(e)
-                self.logger.error("Failed parsing the json for conn_id %s", self.conn_id)
+                self.log.exception(e)
+                self.log.error("Failed parsing the json for conn_id %s", self.conn_id)
 
         return obj
 
@@ -1001,7 +1001,7 @@ class TaskInstance(Base, LoggingMixin):
         """
         Forces the task instance's state to FAILED in the database.
         """
-        self.logger.error("Recording the task instance as FAILED")
+        self.log.error("Recording the task instance as FAILED")
         self.state = State.FAILED
         session.merge(self)
         session.commit()
@@ -1152,7 +1152,7 @@ class TaskInstance(Base, LoggingMixin):
                 session=session):
             failed = True
             if verbose:
-                self.logger.info(
+                self.log.info(
                     "Dependencies not met for %s, dependency '%s' FAILED: %s",
                     self, dep_status.dep_name, dep_status.reason
                 )
@@ -1161,7 +1161,7 @@ class TaskInstance(Base, LoggingMixin):
             return False
 
         if verbose:
-            self.logger.info("Dependencies all met for %s", self)
+            self.log.info("Dependencies all met for %s", self)
 
         return True
 
@@ -1177,7 +1177,7 @@ class TaskInstance(Base, LoggingMixin):
                     session,
                     dep_context):
 
-                self.logger.debug(
+                self.log.debug(
                     "%s dependency '%s' PASSED: %s, %s",
                     self, dep_status.dep_name, dep_status.passed, dep_status.reason
                 )
@@ -1354,10 +1354,10 @@ class TaskInstance(Base, LoggingMixin):
                    "runtime. Attempt {attempt} of {total}. State set to NONE.").format(
                 attempt=self.try_number + 1,
                 total=self.max_tries + 1)
-            self.logger.warning(hr + msg + hr)
+            self.log.warning(hr + msg + hr)
 
             self.queued_dttm = datetime.now()
-            self.logger.info("Queuing into pool %s", self.pool)
+            self.log.info("Queuing into pool %s", self.pool)
             session.merge(self)
             session.commit()
             return False
@@ -1366,12 +1366,12 @@ class TaskInstance(Base, LoggingMixin):
         # the current worker process was blocked on refresh_from_db
         if self.state == State.RUNNING:
             msg = "Task Instance already running {}".format(self)
-            self.logger.warning(msg)
+            self.log.warning(msg)
             session.commit()
             return False
 
         # print status message
-        self.logger.info(hr + msg + hr)
+        self.log.info(hr + msg + hr)
         self.try_number += 1
 
         if not test_mode:
@@ -1389,10 +1389,10 @@ class TaskInstance(Base, LoggingMixin):
         if verbose:
             if mark_success:
                 msg = "Marking success for {} on {}".format(self.task, self.execution_date)
-                self.logger.info(msg)
+                self.log.info(msg)
             else:
                 msg = "Executing {} on {}".format(self.task, self.execution_date)
-                self.logger.info(msg)
+                self.log.info(msg)
         return True
 
     @provide_session
@@ -1434,7 +1434,7 @@ class TaskInstance(Base, LoggingMixin):
 
                 def signal_handler(signum, frame):
                     """Setting kill signal handler"""
-                    self.logger.error("Killing subprocess")
+                    self.log.error("Killing subprocess")
                     task_copy.on_kill()
                     raise AirflowException("Task received SIGTERM signal")
                 signal.signal(signal.SIGTERM, signal_handler)
@@ -1513,8 +1513,8 @@ class TaskInstance(Base, LoggingMixin):
             if task.on_success_callback:
                 task.on_success_callback(context)
         except Exception as e3:
-            self.logger.error("Failed when executing success callback")
-            self.logger.exception(e3)
+            self.log.error("Failed when executing success callback")
+            self.log.exception(e3)
 
         session.commit()
 
@@ -1559,7 +1559,7 @@ class TaskInstance(Base, LoggingMixin):
         task_copy.dry_run()
 
     def handle_failure(self, error, test_mode=False, context=None):
-        self.logger.exception(error)
+        self.log.exception(error)
         task = self.task
         session = settings.Session()
         self.end_date = datetime.now()
@@ -1580,20 +1580,20 @@ class TaskInstance(Base, LoggingMixin):
             # next task instance try_number exceeds the max_tries.
             if task.retries and self.try_number <= self.max_tries:
                 self.state = State.UP_FOR_RETRY
-                self.logger.info('Marking task as UP_FOR_RETRY')
+                self.log.info('Marking task as UP_FOR_RETRY')
                 if task.email_on_retry and task.email:
                     self.email_alert(error, is_retry=True)
             else:
                 self.state = State.FAILED
                 if task.retries:
-                    self.logger.info('All retries failed; marking task as FAILED')
+                    self.log.info('All retries failed; marking task as FAILED')
                 else:
-                    self.logger.info('Marking task as FAILED.')
+                    self.log.info('Marking task as FAILED.')
                 if task.email_on_failure and task.email:
                     self.email_alert(error, is_retry=False)
         except Exception as e2:
-            self.logger.error('Failed to send email to: %s', task.email)
-            self.logger.exception(e2)
+            self.log.error('Failed to send email to: %s', task.email)
+            self.log.exception(e2)
 
         # Handling callbacks pessimistically
         try:
@@ -1602,13 +1602,13 @@ class TaskInstance(Base, LoggingMixin):
             if self.state == State.FAILED and task.on_failure_callback:
                 task.on_failure_callback(context)
         except Exception as e3:
-            self.logger.error("Failed at executing callback")
-            self.logger.exception(e3)
+            self.log.error("Failed at executing callback")
+            self.log.exception(e3)
 
         if not test_mode:
             session.merge(self)
         session.commit()
-        self.logger.error(str(error))
+        self.log.error(str(error))
 
     @provide_session
     def get_template_context(self, session=None):
@@ -1898,7 +1898,7 @@ class Log(Base):
         self.owner = owner or task_owner
 
 
-class SkipMixin(object):
+class SkipMixin(LoggingMixin):
     def skip(self, dag_run, execution_date, tasks):
         """
         Sets tasks instances to skipped from the same dag run.
@@ -1926,7 +1926,7 @@ class SkipMixin(object):
         else:
             assert execution_date is not None, "Execution date is None and no dag run"
 
-            self.logger.warning("No DAG RUN present this should not happen")
+            self.log.warning("No DAG RUN present this should not happen")
             # this is defensive against dag runs that are not complete
             for task in tasks:
                 ti = TaskInstance(task, execution_date=execution_date)
@@ -2121,7 +2121,7 @@ class BaseOperator(LoggingMixin):
         self.email_on_failure = email_on_failure
         self.start_date = start_date
         if start_date and not isinstance(start_date, datetime):
-            self.logger.warning("start_date for %s isn't datetime.datetime", self)
+            self.log.warning("start_date for %s isn't datetime.datetime", self)
         self.end_date = end_date
         if not TriggerRule.is_valid(trigger_rule):
             raise AirflowException(
@@ -2137,7 +2137,7 @@ class BaseOperator(LoggingMixin):
             self.depends_on_past = True
 
         if schedule_interval:
-            self.logger.warning(
+            self.log.warning(
                 "schedule_interval is used for {}, though it has "
                 "been deprecated as a task parameter, you need to "
                 "specify it as a DAG parameter instead",
@@ -2155,7 +2155,7 @@ class BaseOperator(LoggingMixin):
         if isinstance(retry_delay, timedelta):
             self.retry_delay = retry_delay
         else:
-            self.logger.debug("Retry_delay isn't timedelta object, assuming secs")
+            self.log.debug("Retry_delay isn't timedelta object, assuming secs")
             self.retry_delay = timedelta(seconds=retry_delay)
         self.retry_exponential_backoff = retry_exponential_backoff
         self.max_retry_delay = max_retry_delay
@@ -2455,7 +2455,7 @@ class BaseOperator(LoggingMixin):
                 try:
                     setattr(self, attr, env.loader.get_source(env, content)[0])
                 except Exception as e:
-                    self.logger.exception(e)
+                    self.log.exception(e)
         self.prepare_template()
 
     @property
@@ -2574,12 +2574,12 @@ class BaseOperator(LoggingMixin):
                 ignore_ti_state=ignore_ti_state)
 
     def dry_run(self):
-        self.logger.info('Dry run')
+        self.log.info('Dry run')
         for attr in self.template_fields:
             content = getattr(self, attr)
             if content and isinstance(content, six.string_types):
-                self.logger.info('Rendering template for %s', attr)
-                self.logger.info(content)
+                self.log.info('Rendering template for %s', attr)
+                self.log.info(content)
 
     def get_direct_relatives(self, upstream=False):
         """
@@ -3517,7 +3517,7 @@ class DAG(BaseDag, LoggingMixin):
             d['pickle_len'] = len(pickled)
             d['pickling_duration'] = "{}".format(datetime.now() - dttm)
         except Exception as e:
-            self.logger.debug(e)
+            self.log.debug(e)
             d['is_picklable'] = False
             d['stacktrace'] = traceback.format_exc()
         return d
@@ -3754,7 +3754,7 @@ class DAG(BaseDag, LoggingMixin):
             DagModel).filter(DagModel.dag_id == self.dag_id).first()
         if not orm_dag:
             orm_dag = DagModel(dag_id=self.dag_id)
-            self.logger.info("Creating ORM DAG for %s", self.dag_id)
+            self.log.info("Creating ORM DAG for %s", self.dag_id)
         orm_dag.fileloc = self.fileloc
         orm_dag.is_subdag = self.is_subdag
         orm_dag.owners = owner
@@ -3797,11 +3797,11 @@ class DAG(BaseDag, LoggingMixin):
         :type expiration_date: datetime
         :return: None
         """
-        logger = LoggingMixin().logger
+        log = LoggingMixin().log
         for dag in session.query(
                 DagModel).filter(DagModel.last_scheduler_run < expiration_date,
                                  DagModel.is_active).all():
-            logger.info(
+            log.info(
                 "Deactivating DAG ID %s since it was last touched by the scheduler at %s",
                 dag.dag_id, dag.last_scheduler_run.isoformat()
             )
@@ -3930,7 +3930,7 @@ class Variable(Base, LoggingMixin):
                 self._val = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_encrypted = True
             except AirflowException:
-                self.logger.exception(
+                self.log.exception(
                     "Failed to load fernet while encrypting value, using non-encrypted value."
                 )
                 self._val = value
@@ -4052,7 +4052,7 @@ class XCom(Base, LoggingMixin):
             try:
                 value = json.dumps(value).encode('UTF-8')
             except ValueError:
-                log = LoggingMixin().logger
+                log = LoggingMixin().log
                 log.error("Could not serialize the XCOM value into JSON. "
                           "If you are using pickles instead of JSON "
                           "for XCOM, then you need to enable pickle "
@@ -4123,7 +4123,7 @@ class XCom(Base, LoggingMixin):
                 try:
                     return json.loads(result.value.decode('UTF-8'))
                 except ValueError:
-                    log = LoggingMixin().logger
+                    log = LoggingMixin().log
                     log.error("Could not serialize the XCOM value into JSON. "
                               "If you are using pickles instead of JSON "
                               "for XCOM, then you need to enable pickle "
@@ -4173,7 +4173,7 @@ class XCom(Base, LoggingMixin):
                 try:
                     result.value = json.loads(result.value.decode('UTF-8'))
                 except ValueError:
-                    log = LoggingMixin().logger
+                    log = LoggingMixin().log
                     log.error("Could not serialize the XCOM value into JSON. "
                               "If you are using pickles instead of JSON "
                               "for XCOM, then you need to enable pickle "
@@ -4229,7 +4229,7 @@ class DagStat(Base):
             session.commit()
         except Exception as e:
             session.rollback()
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.warning("Could not update dag stats for %s", dag_id)
             log.exception(e)
 
@@ -4282,7 +4282,7 @@ class DagStat(Base):
             session.commit()
         except Exception as e:
             session.rollback()
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.warning("Could not update dag stat table")
             log.exception(e)
 
@@ -4306,7 +4306,7 @@ class DagStat(Base):
                     session.commit()
                 except Exception as e:
                     session.rollback()
-                    log = LoggingMixin().logger
+                    log = LoggingMixin().log
                     log.warning("Could not create stat record")
                     log.exception(e)
 
@@ -4524,7 +4524,7 @@ class DagRun(Base, LoggingMixin):
 
         tis = self.get_task_instances(session=session)
 
-        self.logger.info("Updating state for %s considering %s task(s)", self, len(tis))
+        self.log.info("Updating state for %s considering %s task(s)", self, len(tis))
 
         for ti in list(tis):
             # skip in db?
@@ -4570,18 +4570,18 @@ class DagRun(Base, LoggingMixin):
             # if all roots finished and at least on failed, the run failed
             if (not unfinished_tasks and
                     any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
-                self.logger.info('Marking run %s failed', self)
+                self.log.info('Marking run %s failed', self)
                 self.state = State.FAILED
 
             # if all roots succeeded and no unfinished tasks, the run succeeded
             elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
                                               for r in roots):
-                self.logger.info('Marking run %s successful', self)
+                self.log.info('Marking run %s successful', self)
                 self.state = State.SUCCESS
 
             # if *all tasks* are deadlocked, the run failed
             elif unfinished_tasks and none_depends_on_past and no_dependencies_met:
-                self.logger.info('Deadlock; marking run %s failed', self)
+                self.log.info('Deadlock; marking run %s failed', self)
                 self.state = State.FAILED
 
             # finally, if the roots aren't done, the dag is still running

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 63321fb..ff2ed51 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -67,7 +67,7 @@ class BashOperator(BaseOperator):
         which will be cleaned afterwards
         """
         bash_command = self.bash_command
-        self.logger.info("Tmp dir root location: \n %s", gettempdir())
+        self.log.info("Tmp dir root location: \n %s", gettempdir())
         with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
             with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
 
@@ -75,11 +75,11 @@ class BashOperator(BaseOperator):
                 f.flush()
                 fname = f.name
                 script_location = tmp_dir + "/" + fname
-                self.logger.info(
+                self.log.info(
                     "Temporary script location: %s",
                     script_location
                 )
-                self.logger.info("Running command: %s", bash_command)
+                self.log.info("Running command: %s", bash_command)
                 sp = Popen(
                     ['bash', fname],
                     stdout=PIPE, stderr=STDOUT,
@@ -88,13 +88,13 @@ class BashOperator(BaseOperator):
 
                 self.sp = sp
 
-                self.logger.info("Output:")
+                self.log.info("Output:")
                 line = ''
                 for line in iter(sp.stdout.readline, b''):
                     line = line.decode(self.output_encoding).strip()
-                    self.logger.info(line)
+                    self.log.info(line)
                 sp.wait()
-                self.logger.info(
+                self.log.info(
                     "Command exited with return code %s",
                     sp.returncode
                 )
@@ -106,6 +106,6 @@ class BashOperator(BaseOperator):
             return line
 
     def on_kill(self):
-        self.logger.info('Sending SIGTERM signal to bash process group')
+        self.log.info('Sending SIGTERM signal to bash process group')
         os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index f263a2c..ff82539 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -71,15 +71,15 @@ class CheckOperator(BaseOperator):
         self.sql = sql
 
     def execute(self, context=None):
-        self.logger.info('Executing SQL check: %s', self.sql)
+        self.log.info('Executing SQL check: %s', self.sql)
         records = self.get_db_hook().get_first(self.sql)
-        self.logger.info('Record: %s', records)
+        self.log.info('Record: %s', records)
         if not records:
             raise AirflowException("The query returned None")
         elif not all([bool(r) for r in records]):
             exceptstr = "Test failed.\nQuery:\n{q}\nResults:\n{r!s}"
             raise AirflowException(exceptstr.format(q=self.sql, r=records))
-        self.logger.info("Success.")
+        self.log.info("Success.")
 
     def get_db_hook(self):
         return BaseHook.get_hook(conn_id=self.conn_id)
@@ -134,7 +134,7 @@ class ValueCheckOperator(BaseOperator):
         self.has_tolerance = self.tol is not None
 
     def execute(self, context=None):
-        self.logger.info('Executing SQL check: %s', self.sql)
+        self.log.info('Executing SQL check: %s', self.sql)
         records = self.get_db_hook().get_first(self.sql)
         if not records:
             raise AirflowException("The query returned None")
@@ -208,9 +208,9 @@ class IntervalCheckOperator(BaseOperator):
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-        self.logger.info('Executing SQL check: %s', self.sql2)
+        self.log.info('Executing SQL check: %s', self.sql2)
         row2 = hook.get_first(self.sql2)
-        self.logger.info('Executing SQL check: %s', self.sql1)
+        self.log.info('Executing SQL check: %s', self.sql1)
         row1 = hook.get_first(self.sql1)
         if not row2:
             raise AirflowException("The query {q} returned None".format(q=self.sql2))
@@ -230,20 +230,20 @@ class IntervalCheckOperator(BaseOperator):
             else:
                 ratio = float(max(current[m], reference[m])) / \
                     min(current[m], reference[m])
-            self.logger.info(rlog.format(m, ratio, self.metrics_thresholds[m]))
+            self.log.info(rlog.format(m, ratio, self.metrics_thresholds[m]))
             ratios[m] = ratio
             test_results[m] = ratio < self.metrics_thresholds[m]
         if not all(test_results.values()):
             failed_tests = [it[0] for it in test_results.items() if not it[1]]
             j = len(failed_tests)
             n = len(self.metrics_sorted)
-            self.logger.warning(countstr.format(**locals()))
+            self.log.warning(countstr.format(**locals()))
             for k in failed_tests:
-                self.logger.warning(
+                self.log.warning(
                     fstr.format(k=k, r=ratios[k], tr=self.metrics_thresholds[k])
                 )
             raise AirflowException(estr.format(", ".join(failed_tests)))
-        self.logger.info("All tests have passed")
+        self.log.info("All tests have passed")
 
     def get_db_hook(self):
         return BaseHook.get_hook(conn_id=self.conn_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index bd2862b..3a952cd 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -69,9 +69,9 @@ class TriggerDagRunOperator(BaseOperator):
                 state=State.RUNNING,
                 conf=dro.payload,
                 external_trigger=True)
-            self.logger.info("Creating DagRun %s", dr)
+            self.log.info("Creating DagRun %s", dr)
             session.add(dr)
             session.commit()
             session.close()
         else:
-            self.logger.info("Criteria not met, moving on")
+            self.log.info("Criteria not met, moving on")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/docker_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py
index 8a333d6..3011f1c 100644
--- a/airflow/operators/docker_operator.py
+++ b/airflow/operators/docker_operator.py
@@ -134,7 +134,7 @@ class DockerOperator(BaseOperator):
         self.container = None
 
     def execute(self, context):
-        self.logger.info('Starting docker container from image %s', self.image)
+        self.log.info('Starting docker container from image %s', self.image)
 
         tls_config = None
         if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
@@ -155,10 +155,10 @@ class DockerOperator(BaseOperator):
             image = self.image
 
         if self.force_pull or len(self.cli.images(name=image)) == 0:
-            self.logger.info('Pulling docker image %s', image)
+            self.log.info('Pulling docker image %s', image)
             for l in self.cli.pull(image, stream=True):
                 output = json.loads(l.decode('utf-8'))
-                self.logger.info("%s", output['status'])
+                self.log.info("%s", output['status'])
 
         cpu_shares = int(round(self.cpus * 1024))
 
@@ -184,7 +184,7 @@ class DockerOperator(BaseOperator):
                 line = line.strip()
                 if hasattr(line, 'decode'):
                     line = line.decode('utf-8')
-                self.logger.info(line)
+                self.log.info(line)
 
             exit_code = self.cli.wait(self.container['Id'])
             if exit_code != 0:
@@ -202,5 +202,5 @@ class DockerOperator(BaseOperator):
 
     def on_kill(self):
         if self.cli is not None:
-            self.logger.info('Stopping docker container')
+            self.log.info('Stopping docker container')
             self.cli.stop(self.container['Id'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/generic_transfer.py
----------------------------------------------------------------------
diff --git a/airflow/operators/generic_transfer.py b/airflow/operators/generic_transfer.py
index 790749a..c8a2a58 100644
--- a/airflow/operators/generic_transfer.py
+++ b/airflow/operators/generic_transfer.py
@@ -61,15 +61,15 @@ class GenericTransfer(BaseOperator):
     def execute(self, context):
         source_hook = BaseHook.get_hook(self.source_conn_id)
 
-        self.logger.info("Extracting data from %s", self.source_conn_id)
-        self.logger.info("Executing: \n %s", self.sql)
+        self.log.info("Extracting data from %s", self.source_conn_id)
+        self.log.info("Executing: \n %s", self.sql)
         results = source_hook.get_records(self.sql)
 
         destination_hook = BaseHook.get_hook(self.destination_conn_id)
         if self.preoperator:
-            self.logger.info("Running preoperator")
-            self.logger.info(self.preoperator)
+            self.log.info("Running preoperator")
+            self.log.info(self.preoperator)
             destination_hook.run(self.preoperator)
 
-        self.logger.info("Inserting rows into %s", self.destination_conn_id)
+        self.log.info("Inserting rows into %s", self.destination_conn_id)
         destination_hook.insert_rows(table=self.destination_table, rows=results)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py
index 983069b..221feeb 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -93,7 +93,7 @@ class HiveOperator(BaseOperator):
             self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.hql)
+        self.log.info('Executing: %s', self.hql)
         self.hook = self.get_hook()
         self.hook.run_cli(hql=self.hql, schema=self.schema,
                           hive_conf=context_to_airflow_vars(context))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_stats_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py
index 025e427..896547e 100644
--- a/airflow/operators/hive_stats_operator.py
+++ b/airflow/operators/hive_stats_operator.py
@@ -139,15 +139,15 @@ class HiveStatsCollectionOperator(BaseOperator):
         """.format(**locals())
 
         hook = PrestoHook(presto_conn_id=self.presto_conn_id)
-        self.logger.info('Executing SQL check: %s', sql)
+        self.log.info('Executing SQL check: %s', sql)
         row = hook.get_first(hql=sql)
-        self.logger.info("Record: %s", row)
+        self.log.info("Record: %s", row)
         if not row:
             raise AirflowException("The query returned None")
 
         part_json = json.dumps(self.partition, sort_keys=True)
 
-        self.logger.info("Deleting rows from previous runs if they exist")
+        self.log.info("Deleting rows from previous runs if they exist")
         mysql = MySqlHook(self.mysql_conn_id)
         sql = """
         SELECT 1 FROM hive_stats
@@ -167,7 +167,7 @@ class HiveStatsCollectionOperator(BaseOperator):
             """.format(**locals())
             mysql.run(sql)
 
-        self.logger.info("Pivoting and loading cells into the Airflow db")
+        self.log.info("Pivoting and loading cells into the Airflow db")
         rows = [
             (self.ds, self.dttm, self.table, part_json) +
             (r[0][0], r[0][1], r[1])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py
index d7b1b82..e420dfd 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -92,7 +92,7 @@ class HiveToDruidTransfer(BaseOperator):
 
     def execute(self, context):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
-        self.logger.info("Extracting data from Hive")
+        self.log.info("Extracting data from Hive")
         hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
         sql = self.sql.strip().strip(';')
         tblproperties = ''.join([", '{}' = '{}'".format(k, v) for k, v in self.hive_tblproperties.items()])
@@ -107,7 +107,7 @@ class HiveToDruidTransfer(BaseOperator):
         AS
         {sql}
         """.format(**locals())
-        self.logger.info("Running command:\n %s", hql)
+        self.log.info("Running command:\n %s", hql)
         hive.run_cli(hql)
 
         m = HiveMetastoreHook(self.metastore_conn_id)
@@ -131,13 +131,13 @@ class HiveToDruidTransfer(BaseOperator):
                 columns=columns,
             )
 
-            self.logger.info("Inserting rows into Druid, hdfs path: %s", static_path)
+            self.log.info("Inserting rows into Druid, hdfs path: %s", static_path)
 
             druid.submit_indexing_job(index_spec)
 
-            self.logger.info("Load seems to have succeeded!")
+            self.log.info("Load seems to have succeeded!")
         finally:
-            self.logger.info(
+            self.log.info(
                 "Cleaning up by dropping the temp Hive table %s",
                 hive_table
             )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_mysql.py b/airflow/operators/hive_to_mysql.py
index e82a099..d2d9d0c 100644
--- a/airflow/operators/hive_to_mysql.py
+++ b/airflow/operators/hive_to_mysql.py
@@ -77,7 +77,7 @@ class HiveToMySqlTransfer(BaseOperator):
 
     def execute(self, context):
         hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
-        self.logger.info("Extracting data from Hive: %s", self.sql)
+        self.log.info("Extracting data from Hive: %s", self.sql)
 
         if self.bulk_load:
             tmpfile = NamedTemporaryFile()
@@ -88,10 +88,10 @@ class HiveToMySqlTransfer(BaseOperator):
 
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
         if self.mysql_preoperator:
-            self.logger.info("Running MySQL preoperator")
+            self.log.info("Running MySQL preoperator")
             mysql.run(self.mysql_preoperator)
 
-        self.logger.info("Inserting rows into MySQL")
+        self.log.info("Inserting rows into MySQL")
 
         if self.bulk_load:
             mysql.bulk_load(table=self.mysql_table, tmp_file=tmpfile.name)
@@ -100,7 +100,7 @@ class HiveToMySqlTransfer(BaseOperator):
             mysql.insert_rows(table=self.mysql_table, rows=results)
 
         if self.mysql_postoperator:
-            self.logger.info("Running MySQL postoperator")
+            self.log.info("Running MySQL postoperator")
             mysql.run(self.mysql_postoperator)
 
-        self.logger.info("Done.")
+        self.log.info("Done.")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_samba_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_samba_operator.py b/airflow/operators/hive_to_samba_operator.py
index d6e6dec..93ebec1 100644
--- a/airflow/operators/hive_to_samba_operator.py
+++ b/airflow/operators/hive_to_samba_operator.py
@@ -53,7 +53,7 @@ class Hive2SambaOperator(BaseOperator):
         samba = SambaHook(samba_conn_id=self.samba_conn_id)
         hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
         tmpfile = tempfile.NamedTemporaryFile()
-        self.logger.info("Fetching file from Hive")
+        self.log.info("Fetching file from Hive")
         hive.to_csv(hql=self.hql, csv_filepath=tmpfile.name)
-        self.logger.info("Pushing to samba")
+        self.log.info("Pushing to samba")
         samba.push_from_local(self.destination_filepath, tmpfile.name)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py
index d92c931..63b892c 100644
--- a/airflow/operators/http_operator.py
+++ b/airflow/operators/http_operator.py
@@ -74,7 +74,7 @@ class SimpleHttpOperator(BaseOperator):
     def execute(self, context):
         http = HttpHook(self.method, http_conn_id=self.http_conn_id)
 
-        self.logger.info("Calling HTTP method")
+        self.log.info("Calling HTTP method")
 
         response = http.run(self.endpoint,
                             self.data,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/jdbc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py
index 942e312..4ec2fa0 100644
--- a/airflow/operators/jdbc_operator.py
+++ b/airflow/operators/jdbc_operator.py
@@ -55,6 +55,6 @@ class JdbcOperator(BaseOperator):
         self.autocommit = autocommit
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id)
         self.hook.run(self.sql, self.autocommit, parameters=self.parameters)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py
index 58f7e67..a1e2a0c 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -32,29 +32,29 @@ class LatestOnlyOperator(BaseOperator, SkipMixin):
         # If the DAG Run is externally triggered, then return without
         # skipping downstream tasks
         if context['dag_run'] and context['dag_run'].external_trigger:
-            self.logger.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
             return
 
         now = datetime.datetime.now()
         left_window = context['dag'].following_schedule(
             context['execution_date'])
         right_window = context['dag'].following_schedule(left_window)
-        self.logger.info(
+        self.log.info(
             'Checking latest only with left_window: %s right_window: %s now: %s',
             left_window, right_window, now
         )
 
         if not left_window < now <= right_window:
-            self.logger.info('Not latest execution, skipping downstream.')
+            self.log.info('Not latest execution, skipping downstream.')
 
             downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-            self.logger.debug("Downstream task_ids %s", downstream_tasks)
+            self.log.debug("Downstream task_ids %s", downstream_tasks)
 
             if downstream_tasks:
                 self.skip(context['dag_run'],
                           context['ti'].execution_date,
                           downstream_tasks)
 
-            self.logger.info('Done.')
+            self.log.info('Done.')
         else:
-            self.logger.info('Latest, allowing execution to proceed.')
+            self.log.info('Latest, allowing execution to proceed.')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mssql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_operator.py b/airflow/operators/mssql_operator.py
index bc0822f..3455232 100644
--- a/airflow/operators/mssql_operator.py
+++ b/airflow/operators/mssql_operator.py
@@ -44,7 +44,7 @@ class MsSqlOperator(BaseOperator):
         self.database = database
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id,
                          schema=self.database)
         hook.run(self.sql, autocommit=self.autocommit,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mssql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py
index 719ddd2..c2c858d 100644
--- a/airflow/operators/mssql_to_hive.py
+++ b/airflow/operators/mssql_to_hive.py
@@ -102,7 +102,7 @@ class MsSqlToHiveTransfer(BaseOperator):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         mssql = MsSqlHook(mssql_conn_id=self.mssql_conn_id)
 
-        self.logger.info("Dumping Microsoft SQL Server query results to local file")
+        self.log.info("Dumping Microsoft SQL Server query results to local file")
         conn = mssql.get_conn()
         cursor = conn.cursor()
         cursor.execute(self.sql)
@@ -118,7 +118,7 @@ class MsSqlToHiveTransfer(BaseOperator):
             f.flush()
             cursor.close()
             conn.close()
-            self.logger.info("Loading file into Hive")
+            self.log.info("Loading file into Hive")
             hive.load_file(
                 f.name,
                 self.hive_table,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mysql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_operator.py b/airflow/operators/mysql_operator.py
index 923eaf8..20f1b7e 100644
--- a/airflow/operators/mysql_operator.py
+++ b/airflow/operators/mysql_operator.py
@@ -46,7 +46,7 @@ class MySqlOperator(BaseOperator):
         self.database = database
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                          schema=self.database)
         hook.run(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mysql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py
index fde92b5..cd472a8 100644
--- a/airflow/operators/mysql_to_hive.py
+++ b/airflow/operators/mysql_to_hive.py
@@ -110,7 +110,7 @@ class MySqlToHiveTransfer(BaseOperator):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
 
-        self.logger.info("Dumping MySQL query results to local file")
+        self.log.info("Dumping MySQL query results to local file")
         conn = mysql.get_conn()
         cursor = conn.cursor()
         cursor.execute(self.sql)
@@ -123,7 +123,7 @@ class MySqlToHiveTransfer(BaseOperator):
             f.flush()
             cursor.close()
             conn.close()
-            self.logger.info("Loading file into Hive")
+            self.log.info("Loading file into Hive")
             hive.load_file(
                 f.name,
                 self.hive_table,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/oracle_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/oracle_operator.py b/airflow/operators/oracle_operator.py
index f87bbf9..9a35267 100644
--- a/airflow/operators/oracle_operator.py
+++ b/airflow/operators/oracle_operator.py
@@ -42,7 +42,7 @@ class OracleOperator(BaseOperator):
         self.parameters = parameters
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
         hook.run(
             self.sql,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/pig_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/pig_operator.py b/airflow/operators/pig_operator.py
index cdce48a..a4e4e5c 100644
--- a/airflow/operators/pig_operator.py
+++ b/airflow/operators/pig_operator.py
@@ -59,7 +59,7 @@ class PigOperator(BaseOperator):
                 "(\$([a-zA-Z_][a-zA-Z0-9_]*))", "{{ \g<2> }}", self.pig)
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.pig)
+        self.log.info('Executing: %s', self.pig)
         self.hook = self.get_hook()
         self.hook.run_cli(pig=self.pig)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/postgres_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/postgres_operator.py b/airflow/operators/postgres_operator.py
index 55c1573..c93dc7b 100644
--- a/airflow/operators/postgres_operator.py
+++ b/airflow/operators/postgres_operator.py
@@ -49,7 +49,7 @@ class PostgresOperator(BaseOperator):
         self.database = database
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
                                  schema=self.database)
         self.hook.run(self.sql, self.autocommit, parameters=self.parameters)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/presto_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/operators/presto_to_mysql.py b/airflow/operators/presto_to_mysql.py
index 48158ca..d0c323a 100644
--- a/airflow/operators/presto_to_mysql.py
+++ b/airflow/operators/presto_to_mysql.py
@@ -61,14 +61,14 @@ class PrestoToMySqlTransfer(BaseOperator):
 
     def execute(self, context):
         presto = PrestoHook(presto_conn_id=self.presto_conn_id)
-        self.logger.info("Extracting data from Presto: %s", self.sql)
+        self.log.info("Extracting data from Presto: %s", self.sql)
         results = presto.get_records(self.sql)
 
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
         if self.mysql_preoperator:
-            self.logger.info("Running MySQL preoperator")
-            self.logger.info(self.mysql_preoperator)
+            self.log.info("Running MySQL preoperator")
+            self.log.info(self.mysql_preoperator)
             mysql.run(self.mysql_preoperator)
 
-        self.logger.info("Inserting rows into MySQL")
+        self.log.info("Inserting rows into MySQL")
         mysql.insert_rows(table=self.mysql_table, rows=results)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py
index 56837ec..718c88f 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -87,7 +87,7 @@ class PythonOperator(BaseOperator):
             self.op_kwargs = context
 
         return_value = self.execute_callable()
-        self.logger.info("Done. Returned value was: %s", return_value)
+        self.log.info("Done. Returned value was: %s", return_value)
         return return_value
 
     def execute_callable(self):
@@ -115,17 +115,17 @@ class BranchPythonOperator(PythonOperator, SkipMixin):
     """
     def execute(self, context):
         branch = super(BranchPythonOperator, self).execute(context)
-        self.logger.info("Following branch %s", branch)
-        self.logger.info("Marking other directly downstream tasks as skipped")
+        self.log.info("Following branch %s", branch)
+        self.log.info("Marking other directly downstream tasks as skipped")
 
         downstream_tasks = context['task'].downstream_list
-        self.logger.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream task_ids %s", downstream_tasks)
 
         skip_tasks = [t for t in downstream_tasks if t.task_id != branch]
         if downstream_tasks:
             self.skip(context['dag_run'], context['ti'].execution_date, skip_tasks)
 
-        self.logger.info("Done.")
+        self.log.info("Done.")
 
 
 class ShortCircuitOperator(PythonOperator, SkipMixin):
@@ -142,21 +142,21 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     def execute(self, context):
         condition = super(ShortCircuitOperator, self).execute(context)
-        self.logger.info("Condition result is %s", condition)
+        self.log.info("Condition result is %s", condition)
 
         if condition:
-            self.logger.info('Proceeding with downstream tasks...')
+            self.log.info('Proceeding with downstream tasks...')
             return
 
-        self.logger.info('Skipping downstream tasks...')
+        self.log.info('Skipping downstream tasks...')
 
         downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-        self.logger.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream task_ids %s", downstream_tasks)
 
         if downstream_tasks:
             self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
 
-        self.logger.info("Done.")
+        self.log.info("Done.")
 
 class PythonVirtualenvOperator(PythonOperator):
     """
@@ -233,7 +233,7 @@ class PythonVirtualenvOperator(PythonOperator):
             # generate filenames
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
-            string_args_filename = os.path.join(tmp_dir, 'string_args.txt') 
+            string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
             # set up virtualenv
@@ -261,12 +261,12 @@ class PythonVirtualenvOperator(PythonOperator):
 
     def _execute_in_subprocess(self, cmd):
         try:
-            self.logger.info("Executing cmd\n{}".format(cmd))
+            self.log.info("Executing cmd\n{}".format(cmd))
             output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
             if output:
-                self.logger.info("Got output\n{}".format(output))
+                self.log.info("Got output\n{}".format(output))
         except subprocess.CalledProcessError as e:
-            self.logger.info("Got error output\n{}".format(e.output))
+            self.log.info("Got error output\n{}".format(e.output))
             raise
 
     def _write_string_args(self, filename):
@@ -294,14 +294,14 @@ class PythonVirtualenvOperator(PythonOperator):
                 else:
                     return pickle.load(f)
             except ValueError:
-                self.logger.error("Error deserializing result. Note that result deserialization "
+                self.log.error("Error deserializing result. Note that result deserialization "
                               "is not supported across major Python versions.")
                 raise
 
     def _write_script(self, script_filename):
         with open(script_filename, 'w') as f:
             python_code = self._generate_python_code()
-            self.logger.debug('Writing code to file\n{}'.format(python_code))
+            self.log.debug('Writing code to file\n{}'.format(python_code))
             f.write(python_code)
 
     def _generate_virtualenv_cmd(self, tmp_dir):
@@ -323,7 +323,7 @@ class PythonVirtualenvOperator(PythonOperator):
     def _generate_python_cmd(self, tmp_dir, script_filename, input_filename, output_filename, string_args_filename):
         # direct path alleviates need to activate
         return ['{}/bin/python'.format(tmp_dir), script_filename, input_filename, output_filename, string_args_filename]
-            
+
     def _generate_python_code(self):
         if self.use_dill:
             pickling_library = 'dill'
@@ -354,3 +354,5 @@ class PythonVirtualenvOperator(PythonOperator):
                 python_callable_name=fn.__name__,
                 pickling_library=pickling_library)
 
+        self.log.info("Done.")
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/redshift_to_s3_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py
index e25d613..683ff9c 100644
--- a/airflow/operators/redshift_to_s3_operator.py
+++ b/airflow/operators/redshift_to_s3_operator.py
@@ -70,7 +70,7 @@ class RedshiftToS3Transfer(BaseOperator):
         a_key, s_key = self.s3.get_credentials()
         unload_options = '\n\t\t\t'.join(self.unload_options)
 
-        self.logger.info("Retrieving headers from %s.%s...", self.schema, self.table)
+        self.log.info("Retrieving headers from %s.%s...", self.schema, self.table)
 
         columns_query = """SELECT column_name
                             FROM information_schema.columns
@@ -99,6 +99,6 @@ class RedshiftToS3Transfer(BaseOperator):
                         """.format(column_names, column_castings, self.schema, self.table,
                                 self.s3_bucket, self.s3_key, a_key, s_key, unload_options)
 
-        self.logger.info('Executing UNLOAD command...')
+        self.log.info('Executing UNLOAD command...')
         self.hook.run(unload_query, self.autocommit)
-        self.logger.info("UNLOAD command complete...")
+        self.log.info("UNLOAD command complete...")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/s3_file_transform_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py
index 5de5127..68c733c 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -74,12 +74,12 @@ class S3FileTransformOperator(BaseOperator):
     def execute(self, context):
         source_s3 = S3Hook(s3_conn_id=self.source_s3_conn_id)
         dest_s3 = S3Hook(s3_conn_id=self.dest_s3_conn_id)
-        self.logger.info("Downloading source S3 file %s", self.source_s3_key)
+        self.log.info("Downloading source S3 file %s", self.source_s3_key)
         if not source_s3.check_for_key(self.source_s3_key):
             raise AirflowException("The source key {0} does not exist".format(self.source_s3_key))
         source_s3_key_object = source_s3.get_key(self.source_s3_key)
         with NamedTemporaryFile("w") as f_source, NamedTemporaryFile("w") as f_dest:
-            self.logger.info(
+            self.log.info(
                 "Dumping S3 file %s contents to local file %s",
                 self.source_s3_key, f_source.name
             )
@@ -90,20 +90,20 @@ class S3FileTransformOperator(BaseOperator):
                 [self.transform_script, f_source.name, f_dest.name],
                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
             (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate()
-            self.logger.info("Transform script stdout %s", transform_script_stdoutdata)
+            self.log.info("Transform script stdout %s", transform_script_stdoutdata)
             if transform_script_process.returncode > 0:
                 raise AirflowException("Transform script failed %s", transform_script_stderrdata)
             else:
-                self.logger.info(
+                self.log.info(
                     "Transform script successful. Output temporarily located at %s",
                     f_dest.name
                 )
-            self.logger.info("Uploading transformed file to S3")
+            self.log.info("Uploading transformed file to S3")
             f_dest.flush()
             dest_s3.load_file(
                 filename=f_dest.name,
                 key=self.dest_s3_key,
                 replace=self.replace
             )
-            self.logger.info("Upload successful")
+            self.log.info("Upload successful")
             dest_s3.connection.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/s3_to_hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py
index 68fe903..2b4aceb 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -129,7 +129,7 @@ class S3ToHiveTransfer(BaseOperator):
         # Downloading file from S3
         self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
         self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
-        self.logger.info("Downloading S3 file")
+        self.log.info("Downloading S3 file")
 
         if self.wildcard_match:
             if not self.s3.check_for_wildcard_key(self.s3_key):
@@ -146,13 +146,13 @@ class S3ToHiveTransfer(BaseOperator):
                 NamedTemporaryFile(mode="w",
                                    dir=tmp_dir,
                                    suffix=file_ext) as f:
-            self.logger.info("Dumping S3 key {0} contents to local file {1}"
-                             .format(s3_key_object.key, f.name))
+            self.log.info("Dumping S3 key {0} contents to local file {1}"
+                          .format(s3_key_object.key, f.name))
             s3_key_object.get_contents_to_file(f)
             f.flush()
             self.s3.connection.close()
             if not self.headers:
-                self.logger.info("Loading file %s into Hive", f.name)
+                self.log.info("Loading file %s into Hive", f.name)
                 self.hive.load_file(
                     f.name,
                     self.hive_table,
@@ -165,11 +165,11 @@ class S3ToHiveTransfer(BaseOperator):
             else:
                 # Decompressing file
                 if self.input_compressed:
-                    self.logger.info("Uncompressing file %s", f.name)
+                    self.log.info("Uncompressing file %s", f.name)
                     fn_uncompressed = uncompress_file(f.name,
                                                       file_ext,
                                                       tmp_dir)
-                    self.logger.info("Uncompressed to %s", fn_uncompressed)
+                    self.log.info("Uncompressed to %s", fn_uncompressed)
                     # uncompressed file available now so deleting
                     # compressed file to save disk space
                     f.close()
@@ -178,19 +178,19 @@ class S3ToHiveTransfer(BaseOperator):
 
                 # Testing if header matches field_dict
                 if self.check_headers:
-                    self.logger.info("Matching file header against field_dict")
+                    self.log.info("Matching file header against field_dict")
                     header_list = self._get_top_row_as_list(fn_uncompressed)
                     if not self._match_headers(header_list):
                         raise AirflowException("Header check failed")
 
                 # Deleting top header row
-                self.logger.info("Removing header from file %s", fn_uncompressed)
+                self.log.info("Removing header from file %s", fn_uncompressed)
                 headless_file = (
                     self._delete_top_row_and_compress(fn_uncompressed,
                                                       file_ext,
                                                       tmp_dir))
-                self.logger.info("Headless file %s", headless_file)
-                self.logger.info("Loading file %s into Hive", headless_file)
+                self.log.info("Headless file %s", headless_file)
+                self.log.info("Loading file %s into Hive", headless_file)
                 self.hive.load_file(headless_file,
                                     self.hive_table,
                                     field_dict=self.field_dict,
@@ -211,7 +211,7 @@ class S3ToHiveTransfer(BaseOperator):
             raise AirflowException("Unable to retrieve header row from file")
         field_names = self.field_dict.keys()
         if len(field_names) != len(header_list):
-            self.logger.warning("Headers count mismatch"
+            self.log.warning("Headers count mismatch"
                               "File headers:\n {header_list}\n"
                               "Field names: \n {field_names}\n"
                               "".format(**locals()))
@@ -219,7 +219,7 @@ class S3ToHiveTransfer(BaseOperator):
         test_field_match = [h1.lower() == h2.lower()
                             for h1, h2 in zip(header_list, field_names)]
         if not all(test_field_match):
-            self.logger.warning("Headers do not match field names"
+            self.log.warning("Headers do not match field names"
                               "File headers:\n {header_list}\n"
                               "Field names: \n {field_names}\n"
                               "".format(**locals()))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index ea301dc..b5c43c2 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -15,7 +15,7 @@
 from __future__ import print_function
 from future import standard_library
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 standard_library.install_aliases()
 from builtins import str
@@ -82,7 +82,7 @@ class BaseSensorOperator(BaseOperator):
                 else:
                     raise AirflowSensorTimeout('Snap. Time is OUT.')
             sleep(self.poke_interval)
-        self.logger.info("Success criteria met. Exiting.")
+        self.log.info("Success criteria met. Exiting.")
 
 
 class SqlSensor(BaseSensorOperator):
@@ -108,7 +108,7 @@ class SqlSensor(BaseSensorOperator):
     def poke(self, context):
         hook = BaseHook.get_connection(self.conn_id).get_hook()
 
-        self.logger.info('Poking: %s', self.sql)
+        self.log.info('Poking: %s', self.sql)
         records = hook.get_records(self.sql)
         if not records:
             return False
@@ -237,7 +237,7 @@ class ExternalTaskSensor(BaseSensorOperator):
         serialized_dttm_filter = ','.join(
             [datetime.isoformat() for datetime in dttm_filter])
 
-        self.logger.info(
+        self.log.info(
             'Poking for '
             '{self.external_dag_id}.'
             '{self.external_task_id} on '
@@ -313,7 +313,7 @@ class NamedHivePartitionSensor(BaseSensorOperator):
 
             schema, table, partition = self.parse_partition_name(partition)
 
-            self.logger.info(
+            self.log.info(
                 'Poking for {schema}.{table}/{partition}'.format(**locals())
             )
             return self.hook.check_for_named_partition(
@@ -371,7 +371,7 @@ class HivePartitionSensor(BaseSensorOperator):
     def poke(self, context):
         if '.' in self.table:
             self.schema, self.table = self.table.split('.')
-        self.logger.info(
+        self.log.info(
             'Poking for table {self.schema}.{self.table}, '
             'partition {self.partition}'.format(**locals()))
         if not hasattr(self, 'hook'):
@@ -417,7 +417,7 @@ class HdfsSensor(BaseSensorOperator):
         :return: (bool) depending on the matching criteria
         """
         if size:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result))
             size *= settings.MEGABYTE
             result = [x for x in result if x['length'] >= size]
@@ -435,7 +435,7 @@ class HdfsSensor(BaseSensorOperator):
         :return: (list) of dicts which were not removed
         """
         if ignore_copying:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
             ignored_extentions_regex = re.compile(regex_builder)
             log.debug(
@@ -448,20 +448,20 @@ class HdfsSensor(BaseSensorOperator):
 
     def poke(self, context):
         sb = self.hook(self.hdfs_conn_id).get_conn()
-        self.logger.info('Poking for file {self.filepath}'.format(**locals()))
+        self.log.info('Poking for file {self.filepath}'.format(**locals()))
         try:
             # IMOO it's not right here, as there no raise of any kind.
             # if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*',
             # it's not correct as the directory exists and sb does not raise any error
             # here is a quick fix
             result = [f for f in sb.ls([self.filepath], include_toplevel=False)]
-            self.logger.debug('HdfsSensor.poke: result is %s', result)
+            self.log.debug('HdfsSensor.poke: result is %s', result)
             result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
             result = self.filter_for_filesize(result, self.file_size)
             return bool(result)
         except:
             e = sys.exc_info()
-            self.logger.debug("Caught an exception !: %s", str(e))
+            self.log.debug("Caught an exception !: %s", str(e))
             return False
 
 
@@ -484,7 +484,7 @@ class WebHdfsSensor(BaseSensorOperator):
     def poke(self, context):
         from airflow.hooks.webhdfs_hook import WebHDFSHook
         c = WebHDFSHook(self.webhdfs_conn_id)
-        self.logger.info('Poking for file {self.filepath}'.format(**locals()))
+        self.log.info('Poking for file {self.filepath}'.format(**locals()))
         return c.check_for_path(hdfs_path=self.filepath)
 
 
@@ -535,7 +535,7 @@ class S3KeySensor(BaseSensorOperator):
         from airflow.hooks.S3_hook import S3Hook
         hook = S3Hook(s3_conn_id=self.s3_conn_id)
         full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
-        self.logger.info('Poking for key : {full_url}'.format(**locals()))
+        self.log.info('Poking for key : {full_url}'.format(**locals()))
         if self.wildcard_match:
             return hook.check_for_wildcard_key(self.bucket_key,
                                                self.bucket_name)
@@ -577,7 +577,7 @@ class S3PrefixSensor(BaseSensorOperator):
         self.s3_conn_id = s3_conn_id
 
     def poke(self, context):
-        self.logger.info('Poking for prefix : {self.prefix}\n'
+        self.log.info('Poking for prefix : {self.prefix}\n'
                      'in bucket s3://{self.bucket_name}'.format(**locals()))
         from airflow.hooks.S3_hook import S3Hook
         hook = S3Hook(s3_conn_id=self.s3_conn_id)
@@ -602,7 +602,7 @@ class TimeSensor(BaseSensorOperator):
         self.target_time = target_time
 
     def poke(self, context):
-        self.logger.info('Checking if the time (%s) has come', self.target_time)
+        self.log.info('Checking if the time (%s) has come', self.target_time)
         return datetime.now().time() > self.target_time
 
 
@@ -627,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator):
         dag = context['dag']
         target_dttm = dag.following_schedule(context['execution_date'])
         target_dttm += self.delta
-        self.logger.info('Checking if the time (%s) has come', target_dttm)
+        self.log.info('Checking if the time (%s) has come', target_dttm)
         return datetime.now() > target_dttm
 
 
@@ -679,7 +679,7 @@ class HttpSensor(BaseSensorOperator):
             http_conn_id=http_conn_id)
 
     def poke(self, context):
-        self.logger.info('Poking: %s', self.endpoint)
+        self.log.info('Poking: %s', self.endpoint)
         try:
             response = self.hook.run(self.endpoint,
                                      data=self.request_params,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/slack_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py
index 4f2d7bc..8b21211 100644
--- a/airflow/operators/slack_operator.py
+++ b/airflow/operators/slack_operator.py
@@ -67,7 +67,7 @@ class SlackAPIOperator(BaseOperator):
         rc = sc.api_call(self.method, **self.api_params)
         if not rc['ok']:
             msg = "Slack API call failed (%s)".format(rc['error'])
-            self.logger.error(msg)
+            self.log.error(msg)
             raise AirflowException(msg)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/sqlite_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py
index 7c85847..b32837d 100644
--- a/airflow/operators/sqlite_operator.py
+++ b/airflow/operators/sqlite_operator.py
@@ -41,6 +41,6 @@ class SqliteOperator(BaseOperator):
         self.parameters = parameters or []
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         hook = SqliteHook(sqlite_conn_id=self.sqlite_conn_id)
         hook.run(self.sql, parameters=self.parameters)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/plugins_manager.py
----------------------------------------------------------------------
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 7c1d246..d5c3407 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -25,9 +25,9 @@ import re
 import sys
 
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 class AirflowPluginException(Exception):
     pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/security/kerberos.py
----------------------------------------------------------------------
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index a9687b3..7a169b2 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -24,7 +24,7 @@ from airflow import configuration, LoggingMixin
 
 NEED_KRB181_WORKAROUND = None
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def renew_from_kt():

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index cf1eca4..1e5e614 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -27,9 +27,9 @@ from sqlalchemy.orm import scoped_session, sessionmaker
 from sqlalchemy.pool import NullPool
 
 from airflow import configuration as conf
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 class DummyStatsLogger(object):
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py
index 7794f4a..6a07db2 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -19,7 +19,7 @@ import json
 import subprocess
 import threading
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 from airflow import configuration as conf
 from tempfile import mkstemp
@@ -39,7 +39,7 @@ class BaseTaskRunner(LoggingMixin):
         """
         # Pass task instance context into log handlers to setup the logger.
         self._task_instance = local_task_job.task_instance
-        self.set_logger_contexts(self._task_instance)
+        self.set_log_contexts(self._task_instance)
 
         popen_prepend = []
         cfg_path = None
@@ -54,7 +54,7 @@ class BaseTaskRunner(LoggingMixin):
         # Add sudo commands to change user if we need to. Needed to handle SubDagOperator
         # case using a SequentialExecutor.
         if self.run_as_user and (self.run_as_user != getpass.getuser()):
-            self.logger.debug("Planning to run as the %s user", self.run_as_user)
+            self.log.debug("Planning to run as the %s user", self.run_as_user)
             cfg_dict = conf.as_dict(display_sensitive=True)
             cfg_subset = {
                 'core': cfg_dict.get('core', {}),
@@ -95,7 +95,7 @@ class BaseTaskRunner(LoggingMixin):
                 line = line.decode('utf-8')
             if len(line) == 0:
                 break
-            self.logger.info('Subtask: %s', line.rstrip('\n'))
+            self.log.info('Subtask: %s', line.rstrip('\n'))
 
     def run_command(self, run_with, join_args=False):
         """
@@ -112,7 +112,7 @@ class BaseTaskRunner(LoggingMixin):
         """
         cmd = [" ".join(self._command)] if join_args else self._command
         full_cmd = run_with + cmd
-        self.logger.info('Running: %s', full_cmd)
+        self.log.info('Running: %s', full_cmd)
         proc = subprocess.Popen(
             full_cmd,
             stdout=subprocess.PIPE,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/task_runner/bash_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/bash_task_runner.py b/airflow/task_runner/bash_task_runner.py
index b73e258..109b44c 100644
--- a/airflow/task_runner/bash_task_runner.py
+++ b/airflow/task_runner/bash_task_runner.py
@@ -33,7 +33,7 @@ class BashTaskRunner(BaseTaskRunner):
 
     def terminate(self):
         if self.process and psutil.pid_exists(self.process.pid):
-            kill_process_tree(self.logger, self.process.pid)
+            kill_process_tree(self.log, self.process.pid)
 
     def on_finish(self):
         super(BashTaskRunner, self).on_finish()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 6497fcc..cc64f68 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -27,7 +27,7 @@ from datetime import datetime
 
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SimpleDag(BaseDag):
@@ -205,7 +205,7 @@ def list_py_file_paths(directory, safe_mode=True):
 
                     file_paths.append(file_path)
                 except Exception:
-                    log = LoggingMixin().logger
+                    log = LoggingMixin().log
                     log.exception("Error while examining %s", f)
     return file_paths
 
@@ -443,7 +443,7 @@ class DagFileProcessorManager(LoggingMixin):
             if file_path in new_file_paths:
                 filtered_processors[file_path] = processor
             else:
-                self.logger.warning("Stopping processor for %s", file_path)
+                self.log.warning("Stopping processor for %s", file_path)
                 processor.stop()
         self._processors = filtered_processors
 
@@ -519,7 +519,7 @@ class DagFileProcessorManager(LoggingMixin):
                     os.symlink(log_directory, latest_log_directory_path)
             elif (os.path.isdir(latest_log_directory_path) or
                     os.path.isfile(latest_log_directory_path)):
-                self.logger.warning(
+                self.log.warning(
                     "%s already exists as a dir/file. Skip creating symlink.",
                     latest_log_directory_path
                 )
@@ -558,7 +558,7 @@ class DagFileProcessorManager(LoggingMixin):
 
         for file_path, processor in self._processors.items():
             if processor.done:
-                self.logger.info("Processor for %s finished", file_path)
+                self.log.info("Processor for %s finished", file_path)
                 now = datetime.now()
                 finished_processors[file_path] = processor
                 self._last_runtime[file_path] = (now -
@@ -573,7 +573,7 @@ class DagFileProcessorManager(LoggingMixin):
         simple_dags = []
         for file_path, processor in finished_processors.items():
             if processor.result is None:
-                self.logger.warning(
+                self.log.warning(
                     "Processor for %s exited with return code %s. See %s for details.",
                     processor.file_path, processor.exit_code, processor.log_file
                 )
@@ -606,12 +606,12 @@ class DagFileProcessorManager(LoggingMixin):
                                         set(files_paths_at_run_limit))
 
             for file_path, processor in self._processors.items():
-                self.logger.debug(
+                self.log.debug(
                     "File path %s is still being processed (started: %s)",
                     processor.file_path, processor.start_time.isoformat()
                 )
 
-            self.logger.debug(
+            self.log.debug(
                 "Queuing the following files for processing:\n\t%s",
                 "\n\t".join(files_paths_to_queue)
             )
@@ -626,7 +626,7 @@ class DagFileProcessorManager(LoggingMixin):
             processor = self._processor_factory(file_path, log_file_path)
 
             processor.start()
-            self.logger.info(
+            self.log.info(
                 "Started a process (PID: %s) to generate tasks for %s - logging into %s",
                 processor.pid, file_path, log_file_path
             )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index c7e58e7..ef2560f 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -25,9 +25,9 @@ from sqlalchemy import event, exc
 from sqlalchemy.pool import Pool
 
 from airflow import settings
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 def provide_session(func):
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/email.py
----------------------------------------------------------------------
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index f252d55..fadd4d5 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -31,7 +31,7 @@ from email.utils import formatdate
 
 from airflow import configuration
 from airflow.exceptions import AirflowConfigException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 def send_email(to, subject, html_content, files=None, dryrun=False, cc=None, bcc=None, mime_subtype='mixed'):
@@ -88,7 +88,7 @@ def send_email_smtp(to, subject, html_content, files=None, dryrun=False, cc=None
 
 
 def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
 
     SMTP_HOST = configuration.get('smtp', 'SMTP_HOST')
     SMTP_PORT = configuration.getint('smtp', 'SMTP_PORT')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/LoggingMixin.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/LoggingMixin.py b/airflow/utils/log/LoggingMixin.py
deleted file mode 100644
index 4572d63..0000000
--- a/airflow/utils/log/LoggingMixin.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import logging
-from builtins import object
-
-
-class LoggingMixin(object):
-    """
-    Convenience super-class to have a logger configured with the class name
-    """
-
-    @property
-    def logger(self):
-        try:
-            return self._logger
-        except AttributeError:
-            self._logger = logging.root.getChild(self.__class__.__module__ + '.' + self.__class__.__name__)
-            return self._logger
-
-    def set_logger_contexts(self, task_instance):
-        """
-        Set the context for all handlers of current logger.
-        """
-        for handler in self.logger.handlers:
-            try:
-                handler.set_context(task_instance)
-            except AttributeError:
-                pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index 0bc0b5e..dcdaf6d 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -15,7 +15,7 @@ import os
 
 from airflow import configuration
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.log.file_task_handler import FileTaskHandler
 
 
@@ -40,7 +40,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
                 google_cloud_storage_conn_id=remote_conn_id
             )
         except:
-            self.logger.error(
+            self.log.error(
                 'Could not create a GoogleCloudStorageHook with connection id '
                 '"%s". Please make sure that airflow[gcp_api] is installed '
                 'and the GCS connection exists.', remote_conn_id
@@ -137,7 +137,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             # return error if needed
             if return_error:
                 msg = 'Could not read logs from {}'.format(remote_log_location)
-                self.logger.error(msg)
+                self.log.error(msg)
                 return msg
 
     def gcs_write(self, log, remote_log_location, append=True):
@@ -167,7 +167,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
                 tmpfile.flush()
                 self.hook.upload(bkt, blob, tmpfile.name)
         except:
-            self.logger.error('Could not write logs to %s', remote_log_location)
+            self.log.error('Could not write logs to %s', remote_log_location)
 
     def parse_gcs_url(self, gsurl):
         """