You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/19 08:17:33 UTC

[4/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to log

[AIRFLOW-1604] Rename logger to log

In all the popular languages the variable name log
is the de facto
standard for the logging. Rename LoggingMixin.py
to logging_mixin.py
to comply with the Python standard.

When using the .logger a deprecation warning will
be emitted.

Closes #2604 from Fokko/AIRFLOW-1604-logger-to-log


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb2f5890
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb2f5890
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb2f5890

Branch: refs/heads/master
Commit: eb2f589099b87743482c2eb16261b49e284dcd96
Parents: 8e253c7
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Tue Sep 19 10:17:14 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Sep 19 10:17:14 2017 +0200

----------------------------------------------------------------------
 airflow/__init__.py                             |   4 +-
 airflow/api/__init__.py                         |   4 +-
 airflow/api/auth/backend/kerberos_auth.py       |   4 +-
 airflow/bin/cli.py                              |  10 +-
 airflow/configuration.py                        |   4 +-
 .../auth/backends/github_enterprise_auth.py     |   4 +-
 airflow/contrib/auth/backends/google_auth.py    |   4 +-
 airflow/contrib/auth/backends/kerberos_auth.py  |   2 +-
 airflow/contrib/auth/backends/ldap_auth.py      |   4 +-
 airflow/contrib/auth/backends/password_auth.py  |   4 +-
 airflow/contrib/executors/mesos_executor.py     |  34 +--
 airflow/contrib/hooks/bigquery_hook.py          |  24 +--
 airflow/contrib/hooks/cloudant_hook.py          |   4 +-
 airflow/contrib/hooks/databricks_hook.py        |   8 +-
 airflow/contrib/hooks/datadog_hook.py           |   6 +-
 airflow/contrib/hooks/datastore_hook.py         |   4 +-
 airflow/contrib/hooks/ftp_hook.py               |   6 +-
 airflow/contrib/hooks/gcp_api_base_hook.py      |   6 +-
 airflow/contrib/hooks/gcp_dataflow_hook.py      |  14 +-
 airflow/contrib/hooks/gcp_dataproc_hook.py      |  18 +-
 airflow/contrib/hooks/gcp_mlengine_hook.py      |  18 +-
 airflow/contrib/hooks/gcs_hook.py               |   4 +-
 airflow/contrib/hooks/jira_hook.py              |   4 +-
 airflow/contrib/hooks/qubole_hook.py            |  12 +-
 airflow/contrib/hooks/redis_hook.py             |   6 +-
 airflow/contrib/hooks/salesforce_hook.py        |  14 +-
 airflow/contrib/hooks/spark_sql_hook.py         |   6 +-
 airflow/contrib/hooks/spark_submit_hook.py      |  14 +-
 airflow/contrib/hooks/sqoop_hook.py             |  10 +-
 airflow/contrib/hooks/ssh_hook.py               |  14 +-
 airflow/contrib/operators/bigquery_operator.py  |   2 +-
 .../operators/bigquery_table_delete_operator.py |   2 +-
 .../contrib/operators/bigquery_to_bigquery.py   |   2 +-
 airflow/contrib/operators/bigquery_to_gcs.py    |   6 +-
 .../contrib/operators/databricks_operator.py    |  12 +-
 airflow/contrib/operators/dataproc_operator.py  |  16 +-
 .../operators/datastore_export_operator.py      |   2 +-
 .../operators/datastore_import_operator.py      |   2 +-
 airflow/contrib/operators/ecs_operator.py       |  12 +-
 .../contrib/operators/emr_add_steps_operator.py |   4 +-
 .../operators/emr_create_job_flow_operator.py   |   4 +-
 .../emr_terminate_job_flow_operator.py          |   4 +-
 airflow/contrib/operators/file_to_wasb.py       |   2 +-
 airflow/contrib/operators/fs_operator.py        |   2 +-
 .../contrib/operators/gcs_download_operator.py  |   4 +-
 airflow/contrib/operators/gcs_to_bq.py          |   2 +-
 airflow/contrib/operators/hipchat_operator.py   |   4 +-
 airflow/contrib/operators/mlengine_operator.py  |  14 +-
 airflow/contrib/operators/mysql_to_gcs.py       |   2 +-
 airflow/contrib/operators/sftp_operator.py      |   4 +-
 airflow/contrib/operators/vertica_operator.py   |   2 +-
 airflow/contrib/operators/vertica_to_hive.py    |   4 +-
 airflow/contrib/sensors/bigquery_sensor.py      |   2 +-
 airflow/contrib/sensors/datadog_sensor.py       |   2 +-
 airflow/contrib/sensors/emr_base_sensor.py      |   4 +-
 airflow/contrib/sensors/emr_job_flow_sensor.py  |   2 +-
 airflow/contrib/sensors/emr_step_sensor.py      |   2 +-
 airflow/contrib/sensors/ftp_sensor.py           |   2 +-
 airflow/contrib/sensors/gcs_sensor.py           |   4 +-
 airflow/contrib/sensors/hdfs_sensors.py         |   6 +-
 airflow/contrib/sensors/jira_sensor.py          |  14 +-
 airflow/contrib/sensors/redis_key_sensor.py     |   2 +-
 airflow/contrib/sensors/wasb_sensor.py          |   4 +-
 .../contrib/task_runner/cgroup_task_runner.py   |  20 +-
 airflow/executors/__init__.py                   |   4 +-
 airflow/executors/base_executor.py              |  14 +-
 airflow/executors/celery_executor.py            |  16 +-
 airflow/executors/dask_executor.py              |   4 +-
 airflow/executors/local_executor.py             |   6 +-
 airflow/executors/sequential_executor.py        |   4 +-
 airflow/hooks/S3_hook.py                        |  10 +-
 airflow/hooks/base_hook.py                      |   4 +-
 airflow/hooks/dbapi_hook.py                     |   6 +-
 airflow/hooks/druid_hook.py                     |   4 +-
 airflow/hooks/hive_hooks.py                     |  26 +--
 airflow/hooks/http_hook.py                      |   6 +-
 airflow/hooks/oracle_hook.py                    |   8 +-
 airflow/hooks/pig_hook.py                       |   4 +-
 airflow/hooks/webhdfs_hook.py                   |  12 +-
 airflow/hooks/zendesk_hook.py                   |   4 +-
 airflow/jobs.py                                 | 206 +++++++++----------
 airflow/models.py                               | 126 ++++++------
 airflow/operators/bash_operator.py              |  14 +-
 airflow/operators/check_operator.py             |  20 +-
 airflow/operators/dagrun_operator.py            |   4 +-
 airflow/operators/docker_operator.py            |  10 +-
 airflow/operators/generic_transfer.py           |  10 +-
 airflow/operators/hive_operator.py              |   2 +-
 airflow/operators/hive_stats_operator.py        |   8 +-
 airflow/operators/hive_to_druid.py              |  10 +-
 airflow/operators/hive_to_mysql.py              |  10 +-
 airflow/operators/hive_to_samba_operator.py     |   4 +-
 airflow/operators/http_operator.py              |   2 +-
 airflow/operators/jdbc_operator.py              |   2 +-
 airflow/operators/latest_only_operator.py       |  12 +-
 airflow/operators/mssql_operator.py             |   2 +-
 airflow/operators/mssql_to_hive.py              |   4 +-
 airflow/operators/mysql_operator.py             |   2 +-
 airflow/operators/mysql_to_hive.py              |   4 +-
 airflow/operators/oracle_operator.py            |   2 +-
 airflow/operators/pig_operator.py               |   2 +-
 airflow/operators/postgres_operator.py          |   2 +-
 airflow/operators/presto_to_mysql.py            |   8 +-
 airflow/operators/python_operator.py            |  36 ++--
 airflow/operators/redshift_to_s3_operator.py    |   6 +-
 airflow/operators/s3_file_transform_operator.py |  12 +-
 airflow/operators/s3_to_hive_operator.py        |  24 +--
 airflow/operators/sensors.py                    |  34 +--
 airflow/operators/slack_operator.py             |   2 +-
 airflow/operators/sqlite_operator.py            |   2 +-
 airflow/plugins_manager.py                      |   4 +-
 airflow/security/kerberos.py                    |   2 +-
 airflow/settings.py                             |   4 +-
 airflow/task_runner/base_task_runner.py         |  10 +-
 airflow/task_runner/bash_task_runner.py         |   2 +-
 airflow/utils/dag_processing.py                 |  18 +-
 airflow/utils/db.py                             |   4 +-
 airflow/utils/email.py                          |   4 +-
 airflow/utils/log/LoggingMixin.py               |  45 ----
 airflow/utils/log/gcs_task_handler.py           |   8 +-
 airflow/utils/log/logging_mixin.py              |  61 ++++++
 airflow/utils/log/s3_task_handler.py            |   8 +-
 airflow/utils/timeout.py                        |  12 +-
 airflow/www/api/experimental/endpoints.py       |   4 +-
 airflow/www/app.py                              |   2 +-
 scripts/perf/scheduler_ops_metrics.py           |   4 +-
 tests/contrib/hooks/test_databricks_hook.py     |   2 +-
 .../contrib/operators/test_dataproc_operator.py |   8 +-
 tests/contrib/sensors/test_hdfs_sensors.py      |  62 +++---
 tests/executors/test_executor.py                |   4 +-
 tests/operators/sensors.py                      |   4 +-
 tests/test_utils/reset_warning_registry.py      |  82 ++++++++
 tests/utils/log/test_logging.py                 |   6 +-
 tests/utils/test_logging_mixin.py               |  50 +++++
 134 files changed, 858 insertions(+), 708 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 8844eeb..3c5f24c 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -21,7 +21,7 @@ in their PYTHONPATH. airflow_login should be based off the
 """
 from builtins import object
 from airflow import version
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 __version__ = version.version
 
@@ -41,7 +41,7 @@ login = None
 
 
 def load_login():
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
 
     auth_backend = 'airflow.default_login'
     try:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/api/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py
index 39edbed..31a303b 100644
--- a/airflow/api/__init__.py
+++ b/airflow/api/__init__.py
@@ -17,11 +17,11 @@ from airflow.exceptions import AirflowException
 from airflow import configuration as conf
 from importlib import import_module
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 api_auth = None
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def load_auth():

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/api/auth/backend/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py
index 73a5aa2..a904d59 100644
--- a/airflow/api/auth/backend/kerberos_auth.py
+++ b/airflow/api/auth/backend/kerberos_auth.py
@@ -24,7 +24,7 @@
 
 from future.standard_library import install_aliases
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 install_aliases()
 
@@ -47,7 +47,7 @@ client_auth = HTTPKerberosAuth(service='airflow')
 
 _SERVICE_NAME = None
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def init_app(app):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 56f1855..5035a66 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -53,7 +53,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
 
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -64,7 +64,7 @@ api_module = import_module(conf.get('cli', 'api_client'))
 api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
                                auth=api.api_auth.client_auth)
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def sigint_handler(sig, frame):
@@ -189,7 +189,7 @@ def trigger_dag(args):
     :param args:
     :return:
     """
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
     try:
         message = api_client.trigger_dag(dag_id=args.dag_id,
                                          run_id=args.run_id,
@@ -202,7 +202,7 @@ def trigger_dag(args):
 
 
 def pool(args):
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
 
     def _tabulate(pools):
         return "\n%s" % tabulate(pools, ['Pool', 'Slots', 'Description'],
@@ -330,7 +330,7 @@ def run(args, dag=None):
     if dag:
         args.dag_id = dag.dag_id
 
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
 
     # Load custom airflow config
     if args.cfg_path:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index db196f9..ff81d98 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -28,7 +28,7 @@ import sys
 
 from future import standard_library
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 standard_library.install_aliases()
 
@@ -38,7 +38,7 @@ from six.moves import configparser
 
 from airflow.exceptions import AirflowConfigException
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 # show Airflow's deprecation warnings
 warnings.filterwarnings(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/github_enterprise_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py
index 459e9c9..28c3cfc 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -27,9 +27,9 @@ from flask_oauthlib.client import OAuth
 
 from airflow import models, configuration, settings
 from airflow.configuration import AirflowConfigException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def get_config_param(param):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/google_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py
index f38f725..e6eab94 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -26,9 +26,9 @@ from flask import url_for, redirect, request
 from flask_oauthlib.client import OAuth
 
 from airflow import models, configuration, settings
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def get_config_param(param):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py
index ffb711f..908ebc9 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -29,7 +29,7 @@ from flask import url_for, redirect
 from airflow import settings
 from airflow import models
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 login_manager = flask_login.LoginManager()
 login_manager.login_view = 'airflow.login'  # Calls login() below

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/ldap_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 8ce0875..b056851 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -33,13 +33,13 @@ from airflow.configuration import AirflowConfigException
 import traceback
 import re
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 login_manager = flask_login.LoginManager()
 login_manager.login_view = 'airflow.login'  # Calls login() below
 login_manager.login_message = None
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 class AuthenticationError(Exception):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/auth/backends/password_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/password_auth.py b/airflow/contrib/auth/backends/password_auth.py
index 3ad2a8b..8adb1f4 100644
--- a/airflow/contrib/auth/backends/password_auth.py
+++ b/airflow/contrib/auth/backends/password_auth.py
@@ -32,13 +32,13 @@ from sqlalchemy.ext.hybrid import hybrid_property
 
 from airflow import settings
 from airflow import models
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 login_manager = flask_login.LoginManager()
 login_manager.login_view = 'airflow.login'  # Calls login() below
 login_manager.login_message = None
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 PY3 = version_info[0] == 3
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/executors/mesos_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py
index 19d72ed..8728566 100644
--- a/airflow/contrib/executors/mesos_executor.py
+++ b/airflow/contrib/executors/mesos_executor.py
@@ -14,7 +14,7 @@
 
 from future import standard_library
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.www.utils import LoginMixin
 
 standard_library.install_aliases()
@@ -65,7 +65,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
         self.task_key_map = {}
 
     def registered(self, driver, frameworkId, masterInfo):
-        self.logger.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value)
+        self.log.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value)
 
         if configuration.getboolean('mesos', 'CHECKPOINT') and configuration.get('mesos', 'FAILOVER_TIMEOUT'):
             # Import here to work around a circular import error
@@ -86,25 +86,25 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
             Session.remove()
 
     def reregistered(self, driver, masterInfo):
-        self.logger.info("AirflowScheduler re-registered to mesos")
+        self.log.info("AirflowScheduler re-registered to mesos")
 
     def disconnected(self, driver):
-        self.logger.info("AirflowScheduler disconnected from mesos")
+        self.log.info("AirflowScheduler disconnected from mesos")
 
     def offerRescinded(self, driver, offerId):
-        self.logger.info("AirflowScheduler offer %s rescinded", str(offerId))
+        self.log.info("AirflowScheduler offer %s rescinded", str(offerId))
 
     def frameworkMessage(self, driver, executorId, slaveId, message):
-        self.logger.info("AirflowScheduler received framework message %s", message)
+        self.log.info("AirflowScheduler received framework message %s", message)
 
     def executorLost(self, driver, executorId, slaveId, status):
-        self.logger.warning("AirflowScheduler executor %s lost", str(executorId))
+        self.log.warning("AirflowScheduler executor %s lost", str(executorId))
 
     def slaveLost(self, driver, slaveId):
-        self.logger.warning("AirflowScheduler slave %s lost", str(slaveId))
+        self.log.warning("AirflowScheduler slave %s lost", str(slaveId))
 
     def error(self, driver, message):
-        self.logger.error("AirflowScheduler driver aborted %s", message)
+        self.log.error("AirflowScheduler driver aborted %s", message)
         raise AirflowException("AirflowScheduler driver aborted %s" % message)
 
     def resourceOffers(self, driver, offers):
@@ -118,7 +118,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
                 elif resource.name == "mem":
                     offerMem += resource.scalar.value
 
-            self.logger.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
+            self.log.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
 
             remainingCpus = offerCpus
             remainingMem = offerMem
@@ -131,7 +131,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
                 self.task_counter += 1
                 self.task_key_map[str(tid)] = key
 
-                self.logger.info("Launching task %d using offer %s", tid, offer.id.value)
+                self.log.info("Launching task %d using offer %s", tid, offer.id.value)
 
                 task = mesos_pb2.TaskInfo()
                 task.task_id.value = str(tid)
@@ -161,7 +161,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
             driver.launchTasks(offer.id, tasks)
 
     def statusUpdate(self, driver, update):
-        self.logger.info(
+        self.log.info(
             "Task %s is in state %s, data %s",
             update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data)
         )
@@ -171,7 +171,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
         except KeyError:
             # The map may not contain an item if the framework re-registered after a failover.
             # Discard these tasks.
-            self.logger.warning("Unrecognised task key %s", update.task_id.value)
+            self.log.warning("Unrecognised task key %s", update.task_id.value)
             return
 
         if update.state == mesos_pb2.TASK_FINISHED:
@@ -203,7 +203,7 @@ class MesosExecutor(BaseExecutor, LoginMixin):
         framework.user = ''
 
         if not configuration.get('mesos', 'MASTER'):
-            self.logger.error("Expecting mesos master URL for mesos executor")
+            self.log.error("Expecting mesos master URL for mesos executor")
             raise AirflowException("mesos.master not provided for mesos executor")
 
         master = configuration.get('mesos', 'MASTER')
@@ -239,7 +239,7 @@ class MesosExecutor(BaseExecutor, LoginMixin):
         else:
             framework.checkpoint = False
 
-        self.logger.info(
+        self.log.info(
             'MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s',
             master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint)
         )
@@ -248,10 +248,10 @@ class MesosExecutor(BaseExecutor, LoginMixin):
 
         if configuration.getboolean('mesos', 'AUTHENTICATE'):
             if not configuration.get('mesos', 'DEFAULT_PRINCIPAL'):
-                self.logger.error("Expecting authentication principal in the environment")
+                self.log.error("Expecting authentication principal in the environment")
                 raise AirflowException("mesos.default_principal not provided in authenticated mode")
             if not configuration.get('mesos', 'DEFAULT_SECRET'):
-                self.logger.error("Expecting authentication secret in the environment")
+                self.log.error("Expecting authentication secret in the environment")
                 raise AirflowException("mesos.default_secret not provided in authenticated mode")
 
             credential = mesos_pb2.Credential()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 497fa28..5fc7e22 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -32,7 +32,7 @@ from past.builtins import basestring
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 from airflow.hooks.dbapi_hook import DbApiHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
@@ -499,7 +499,7 @@ class BigQueryBaseCursor(LoggingMixin):
                     "'WRITE_APPEND' or 'WRITE_TRUNCATE'."
                 )
             else:
-                self.logger.info(
+                self.log.info(
                     "Adding experimental "
                     "'schemaUpdateOptions': {0}".format(schema_update_options)
                 )
@@ -576,12 +576,12 @@ class BigQueryBaseCursor(LoggingMixin):
                             )
                         )
                 else:
-                    self.logger.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
+                    self.log.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
                     time.sleep(5)
 
             except HttpError as err:
                 if err.resp.status in [500, 503]:
-                    self.logger.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
+                    self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
                     time.sleep(5)
                 else:
                     raise Exception(
@@ -660,14 +660,14 @@ class BigQueryBaseCursor(LoggingMixin):
                         datasetId=deletion_dataset,
                         tableId=deletion_table) \
                 .execute()
-            self.logger.info('Deleted table %s:%s.%s.',
-                         deletion_project, deletion_dataset, deletion_table)
+            self.log.info('Deleted table %s:%s.%s.',
+                          deletion_project, deletion_dataset, deletion_table)
         except HttpError:
             if not ignore_if_missing:
                 raise Exception(
                     'Table deletion failed. Table does not exist.')
             else:
-                self.logger.info('Table does not exist. Skipping.')
+                self.log.info('Table does not exist. Skipping.')
 
 
     def run_table_upsert(self, dataset_id, table_resource, project_id=None):
@@ -694,7 +694,7 @@ class BigQueryBaseCursor(LoggingMixin):
             for table in tables_list_resp.get('tables', []):
                 if table['tableReference']['tableId'] == table_id:
                     # found the table, do update
-                    self.logger.info(
+                    self.log.info(
                         'Table %s:%s.%s exists, updating.',
                         project_id, dataset_id, table_id
                     )
@@ -712,7 +712,7 @@ class BigQueryBaseCursor(LoggingMixin):
             # If there is no next page, then the table doesn't exist.
             else:
                 # do insert
-                self.logger.info(
+                self.log.info(
                     'Table %s:%s.%s does not exist. creating.',
                     project_id, dataset_id, table_id
                 )
@@ -759,7 +759,7 @@ class BigQueryBaseCursor(LoggingMixin):
                                 'tableId': view_table}}
         # check to see if the view we want to add already exists.
         if view_access not in access:
-            self.logger.info(
+            self.log.info(
                 'Granting table %s:%s.%s authorized view access to %s:%s dataset.',
                 view_project, view_dataset, view_table, source_project, source_dataset
             )
@@ -769,7 +769,7 @@ class BigQueryBaseCursor(LoggingMixin):
                                                  body={'access': access}).execute()
         else:
             # if view is already in access, do nothing.
-            self.logger.info(
+            self.log.info(
                 'Table %s:%s.%s already has authorized view access to %s:%s dataset.',
                 view_project, view_dataset, view_table, source_project, source_dataset
             )
@@ -1032,7 +1032,7 @@ def _split_tablename(table_input, default_project_id, var_name=None):
 
     if project_id is None:
         if var_name is not None:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.info(
                 'Project not included in {var}: {input}; using project "{project}"'.format(
                     var=var_name, input=table_input, project=default_project_id

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/cloudant_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py
index d9db08d..cbb0cca 100644
--- a/airflow/contrib/hooks/cloudant_hook.py
+++ b/airflow/contrib/hooks/cloudant_hook.py
@@ -18,7 +18,7 @@ import cloudant
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class CloudantHook(BaseHook):
@@ -35,7 +35,7 @@ class CloudantHook(BaseHook):
         def _str(s):
             # cloudant-python doesn't support unicode.
             if isinstance(s, unicode):
-                log = LoggingMixin().logger
+                log = LoggingMixin().log
                 log.debug(
                     'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".',
                     s

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/databricks_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/databricks_hook.py b/airflow/contrib/hooks/databricks_hook.py
index 7b20433..cd9dc54 100644
--- a/airflow/contrib/hooks/databricks_hook.py
+++ b/airflow/contrib/hooks/databricks_hook.py
@@ -20,7 +20,7 @@ from airflow.hooks.base_hook import BaseHook
 from requests import exceptions as requests_exceptions
 from requests.auth import AuthBase
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 try:
     from urllib import parse as urlparse
@@ -100,10 +100,10 @@ class DatabricksHook(BaseHook, LoggingMixin):
             host=self._parse_host(self.databricks_conn.host),
             endpoint=endpoint)
         if 'token' in self.databricks_conn.extra_dejson:
-            self.logger.info('Using token auth.')
+            self.log.info('Using token auth.')
             auth = _TokenAuth(self.databricks_conn.extra_dejson['token'])
         else:
-            self.logger.info('Using basic auth.')
+            self.log.info('Using basic auth.')
             auth = (self.databricks_conn.login, self.databricks_conn.password)
         if method == 'GET':
             request_func = requests.get
@@ -129,7 +129,7 @@ class DatabricksHook(BaseHook, LoggingMixin):
                         response.content, response.status_code))
             except (requests_exceptions.ConnectionError,
                     requests_exceptions.Timeout) as e:
-                self.logger.error(
+                self.log.error(
                     'Attempt %s API Request to Databricks failed with reason: %s',
                     attempt_num, e
                 )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/datadog_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datadog_hook.py b/airflow/contrib/hooks/datadog_hook.py
index 0f5af00..6caf611 100644
--- a/airflow/contrib/hooks/datadog_hook.py
+++ b/airflow/contrib/hooks/datadog_hook.py
@@ -17,7 +17,7 @@ from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
 from datadog import initialize, api
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class DatadogHook(BaseHook, LoggingMixin):
@@ -47,7 +47,7 @@ class DatadogHook(BaseHook, LoggingMixin):
         if self.app_key is None:
             raise AirflowException("app_key must be specified in the Datadog connection details")
 
-        self.logger.info("Setting up api keys for Datadog")
+        self.log.info("Setting up api keys for Datadog")
         options = {
             'api_key': self.api_key,
             'app_key': self.app_key
@@ -56,7 +56,7 @@ class DatadogHook(BaseHook, LoggingMixin):
 
     def validate_response(self, response):
         if response['status'] != 'ok':
-            self.logger.error("Datadog returned: %s", response)
+            self.log.error("Datadog returned: %s", response)
             raise AirflowException("Error status received from Datadog")
 
     def send_metric(self, metric_name, datapoint, tags=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/datastore_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datastore_hook.py b/airflow/contrib/hooks/datastore_hook.py
index 2ff1600..cf98dc7 100644
--- a/airflow/contrib/hooks/datastore_hook.py
+++ b/airflow/contrib/hooks/datastore_hook.py
@@ -136,8 +136,8 @@ class DatastoreHook(GoogleCloudBaseHook):
             result = self.get_operation(name)
             state = result['metadata']['common']['state']
             if state == 'PROCESSING':
-                self.logger.info('Operation is processing. Re-polling state in {} seconds'
-                        .format(polling_interval_in_seconds))
+                self.log.info('Operation is processing. Re-polling state in {} seconds'
+                              .format(polling_interval_in_seconds))
                 time.sleep(polling_interval_in_seconds)
             else:
                 return result

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/ftp_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ftp_hook.py b/airflow/contrib/hooks/ftp_hook.py
index a6b3181..b1e224d 100644
--- a/airflow/contrib/hooks/ftp_hook.py
+++ b/airflow/contrib/hooks/ftp_hook.py
@@ -19,7 +19,7 @@ import os.path
 from airflow.hooks.base_hook import BaseHook
 from past.builtins import basestring
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 def mlsd(conn, path="", facts=None):
@@ -167,9 +167,9 @@ class FTPHook(BaseHook, LoggingMixin):
 
         remote_path, remote_file_name = os.path.split(remote_full_path)
         conn.cwd(remote_path)
-        self.logger.info('Retrieving file from FTP: %s', remote_full_path)
+        self.log.info('Retrieving file from FTP: %s', remote_full_path)
         conn.retrbinary('RETR %s' % remote_file_name, output_handle.write)
-        self.logger.info('Finished retrieving file from FTP: %s', remote_full_path)
+        self.log.info('Finished retrieving file from FTP: %s', remote_full_path)
 
         if is_path:
             output_handle.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_api_base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py
index 7476c90..28721d3 100644
--- a/airflow/contrib/hooks/gcp_api_base_hook.py
+++ b/airflow/contrib/hooks/gcp_api_base_hook.py
@@ -18,7 +18,7 @@ from oauth2client.service_account import ServiceAccountCredentials
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class GoogleCloudBaseHook(BaseHook, LoggingMixin):
@@ -66,7 +66,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin):
             kwargs['sub'] = self.delegate_to
 
         if not key_path:
-            self.logger.info('Getting connection using `gcloud auth` user, since no key file '
+            self.log.info('Getting connection using `gcloud auth` user, since no key file '
                          'is defined for hook.')
             credentials = GoogleCredentials.get_application_default()
         else:
@@ -74,7 +74,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin):
                 raise AirflowException('Scope should be defined when using a key file.')
             scopes = [s.strip() for s in scope.split(',')]
             if key_path.endswith('.json'):
-                self.logger.info('Getting connection using a JSON key file.')
+                self.log.info('Getting connection using a JSON key file.')
                 credentials = ServiceAccountCredentials\
                     .from_json_keyfile_name(key_path, scopes)
             elif key_path.endswith('.p12'):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index f5767bd..b1a1e0e 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -19,7 +19,7 @@ import uuid
 from apiclient.discovery import build
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class _DataflowJob(LoggingMixin):
@@ -48,12 +48,12 @@ class _DataflowJob(LoggingMixin):
             job = self._dataflow.projects().jobs().get(projectId=self._project_number,
                                                        jobId=self._job_id).execute()
         if 'currentState' in job:
-            self.logger.info(
+            self.log.info(
                 'Google Cloud DataFlow job %s is %s',
                 job['name'], job['currentState']
             )
         else:
-            self.logger.info(
+            self.log.info(
                 'Google Cloud DataFlow with job_id %s has name %s',
                 self._job_id, job['name']
             )
@@ -75,7 +75,7 @@ class _DataflowJob(LoggingMixin):
                 elif 'JOB_STATE_PENDING' == self._job['currentState']:
                     time.sleep(15)
                 else:
-                    self.logger.debug(str(self._job))
+                    self.log.debug(str(self._job))
                     raise Exception(
                         "Google Cloud Dataflow job {} was unknown state: {}".format(
                             self._job['name'], self._job['currentState']))
@@ -109,15 +109,15 @@ class _Dataflow(LoggingMixin):
 
     def wait_for_done(self):
         reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
-        self.logger.info("Start waiting for DataFlow process to complete.")
+        self.log.info("Start waiting for DataFlow process to complete.")
         while self._proc.poll() is None:
             ret = select.select(reads, [], [], 5)
             if ret is not None:
                 for fd in ret[0]:
                     line = self._line(fd)
-                    self.logger.debug(line[:-1])
+                    self.log.debug(line[:-1])
             else:
-                self.logger.info("Waiting for DataFlow process to complete.")
+                self.log.info("Waiting for DataFlow process to complete.")
         if self._proc.returncode is not 0:
             raise Exception("DataFlow failed with return code {}".format(
                 self._proc.returncode))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index 3a1336e..c964f4c 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -18,7 +18,7 @@ import uuid
 from apiclient.discovery import build
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class _DataProcJob(LoggingMixin):
@@ -30,7 +30,7 @@ class _DataProcJob(LoggingMixin):
             region='global',
             body=job).execute()
         self.job_id = self.job['reference']['jobId']
-        self.logger.info(
+        self.log.info(
             'DataProc job %s is %s',
             self.job_id, str(self.job['status']['state'])
         )
@@ -43,20 +43,20 @@ class _DataProcJob(LoggingMixin):
                 jobId=self.job_id).execute()
             if 'ERROR' == self.job['status']['state']:
                 print(str(self.job))
-                self.logger.error('DataProc job %s has errors', self.job_id)
-                self.logger.error(self.job['status']['details'])
-                self.logger.debug(str(self.job))
+                self.log.error('DataProc job %s has errors', self.job_id)
+                self.log.error(self.job['status']['details'])
+                self.log.debug(str(self.job))
                 return False
             if 'CANCELLED' == self.job['status']['state']:
                 print(str(self.job))
-                self.logger.warning('DataProc job %s is cancelled', self.job_id)
+                self.log.warning('DataProc job %s is cancelled', self.job_id)
                 if 'details' in self.job['status']:
-                    self.logger.warning(self.job['status']['details'])
-                self.logger.debug(str(self.job))
+                    self.log.warning(self.job['status']['details'])
+                self.log.debug(str(self.job))
                 return False
             if 'DONE' == self.job['status']['state']:
                 return True
-            self.logger.debug(
+            self.log.debug(
                 'DataProc job %s is %s',
                 self.job_id, str(self.job['status']['state'])
             )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcp_mlengine_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_mlengine_hook.py b/airflow/contrib/hooks/gcp_mlengine_hook.py
index 35f31a7..c17b614 100644
--- a/airflow/contrib/hooks/gcp_mlengine_hook.py
+++ b/airflow/contrib/hooks/gcp_mlengine_hook.py
@@ -20,11 +20,11 @@ from apiclient.discovery import build
 from oauth2client.client import GoogleCredentials
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 def _poll_with_exponential_delay(request, max_n, is_done_func, is_error_func):
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
 
     for i in range(0, max_n):
         try:
@@ -103,18 +103,18 @@ class MLEngineHook(GoogleCloudBaseHook):
                 if use_existing_job_fn is not None:
                     existing_job = self._get_job(project_id, job_id)
                     if not use_existing_job_fn(existing_job):
-                        self.logger.error(
+                        self.log.error(
                             'Job with job_id %s already exist, but it does '
                             'not match our expectation: %s',
                             job_id, existing_job
                         )
                         raise
-                self.logger.info(
+                self.log.info(
                     'Job with job_id %s already exist. Will waiting for it to finish',
                     job_id
                 )
             else:
-                self.logger.error('Failed to create MLEngine job: {}'.format(e))
+                self.log.error('Failed to create MLEngine job: {}'.format(e))
                 raise
 
         return self._wait_for_job_done(project_id, job_id)
@@ -139,7 +139,7 @@ class MLEngineHook(GoogleCloudBaseHook):
                     # polling after 30 seconds when quota failure occurs
                     time.sleep(30)
                 else:
-                    self.logger.error('Failed to get MLEngine job: {}'.format(e))
+                    self.log.error('Failed to get MLEngine job: {}'.format(e))
                     raise
 
     def _wait_for_job_done(self, project_id, job_id, interval=30):
@@ -191,10 +191,10 @@ class MLEngineHook(GoogleCloudBaseHook):
 
         try:
             response = request.execute()
-            self.logger.info('Successfully set version: %s to default', response)
+            self.log.info('Successfully set version: %s to default', response)
             return response
         except errors.HttpError as e:
-            self.logger.error('Something went wrong: %s', e)
+            self.log.error('Something went wrong: %s', e)
             raise
 
     def list_versions(self, project_id, model_name):
@@ -262,6 +262,6 @@ class MLEngineHook(GoogleCloudBaseHook):
             return request.execute()
         except errors.HttpError as e:
             if e.resp.status == 404:
-                self.logger.error('Model was not found: %s', e)
+                self.log.error('Model was not found: %s', e)
                 return None
             raise

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index eb17c3b..24c247e 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -182,7 +182,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
                     ts = ts.replace(tzinfo=dateutil.tz.tzutc())
 
                 updated = dateutil.parser.parse(response['updated'])
-                self.logger.info("Verify object date: %s > %s", updated, ts)
+                self.log.info("Verify object date: %s > %s", updated, ts)
 
                 if updated > ts:
                     return True
@@ -247,7 +247,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
             ).execute()
 
             if 'items' not in response:
-                self.logger.info("No items found for prefix: %s", prefix)
+                self.log.info("No items found for prefix: %s", prefix)
                 break
 
             for item in response['items']:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/jira_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py
index 8702608..21e669f 100644
--- a/airflow/contrib/hooks/jira_hook.py
+++ b/airflow/contrib/hooks/jira_hook.py
@@ -16,7 +16,7 @@ from jira.exceptions import JIRAError
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class JiraHook(BaseHook, LoggingMixin):
@@ -35,7 +35,7 @@ class JiraHook(BaseHook, LoggingMixin):
 
     def get_conn(self):
         if not self.client:
-            self.logger.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
+            self.log.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
 
             get_server_info = True
             validate = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 1a5e7ec..833c1c7 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -21,7 +21,7 @@ import six
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 from qds_sdk.qubole import Qubole
@@ -86,7 +86,7 @@ class QuboleHook(BaseHook, LoggingMixin):
         if cmd_id is not None:
             cmd = Command.find(cmd_id)
             if cmd is not None:
-                log = LoggingMixin().logger
+                log = LoggingMixin().log
                 if cmd.status == 'done':
                     log.info('Command ID: %s has been succeeded, hence marking this '
                                 'TI as Success.', cmd_id)
@@ -99,7 +99,7 @@ class QuboleHook(BaseHook, LoggingMixin):
         args = self.cls.parse(self.create_cmd_args(context))
         self.cmd = self.cls.create(**args)
         context['task_instance'].xcom_push(key='qbol_cmd_id', value=self.cmd.id)
-        self.logger.info(
+        self.log.info(
             "Qubole command created with Id: %s and Status: %s",
             self.cmd.id, self.cmd.status
         )
@@ -107,10 +107,10 @@ class QuboleHook(BaseHook, LoggingMixin):
         while not Command.is_done(self.cmd.status):
             time.sleep(Qubole.poll_interval)
             self.cmd = self.cls.find(self.cmd.id)
-            self.logger.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
+            self.log.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
 
         if 'fetch_logs' in self.kwargs and self.kwargs['fetch_logs'] is True:
-            self.logger.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
+            self.log.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
 
         if self.cmd.status != 'done':
             raise AirflowException('Command Id: {0} failed with Status: {1}'.format(
@@ -126,7 +126,7 @@ class QuboleHook(BaseHook, LoggingMixin):
             cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=ti.task_id)
             self.cmd = self.cls.find(cmd_id)
         if self.cls and self.cmd:
-            self.logger.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
+            self.log.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
             self.cmd.cancel()
 
     def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/redis_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py
index a8999d6..278e196 100644
--- a/airflow/contrib/hooks/redis_hook.py
+++ b/airflow/contrib/hooks/redis_hook.py
@@ -19,7 +19,7 @@ from redis import StrictRedis
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class RedisHook(BaseHook, LoggingMixin):
@@ -41,7 +41,7 @@ class RedisHook(BaseHook, LoggingMixin):
         self.password = conn.password
         self.db = int(conn.extra_dejson.get('db', 0))
 
-        self.logger.debug(
+        self.log.debug(
             '''Connection "{conn}":
             \thost: {host}
             \tport: {port}
@@ -59,7 +59,7 @@ class RedisHook(BaseHook, LoggingMixin):
         Returns a Redis connection.
         """
         if not self.client:
-            self.logger.debug(
+            self.log.debug(
                 'generating Redis client for conn_id "%s" on %s:%s:%s',
                 self.redis_conn_id, self.host, self.port, self.db
             )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/salesforce_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py
index f2b5fef..0d0a104 100644
--- a/airflow/contrib/hooks/salesforce_hook.py
+++ b/airflow/contrib/hooks/salesforce_hook.py
@@ -29,7 +29,7 @@ import json
 import pandas as pd
 import time
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SalesforceHook(BaseHook, LoggingMixin):
@@ -92,10 +92,10 @@ class SalesforceHook(BaseHook, LoggingMixin):
         """
         self.sign_in()
 
-        self.logger.info("Querying for all objects")
+        self.log.info("Querying for all objects")
         query = self.sf.query_all(query)
 
-        self.logger.info(
+        self.log.info(
             "Received results: Total size: %s; Done: %s",
             query['totalSize'], query['done']
         )
@@ -144,7 +144,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
         field_string = self._build_field_list(fields)
 
         query = "SELECT {0} FROM {1}".format(field_string, obj)
-        self.logger.info(
+        self.log.info(
             "Making query to Salesforce: %s",
             query if len(query) < 30 else " ... ".join([query[:15], query[-15:]])
         )
@@ -169,7 +169,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
         try:
             col = pd.to_datetime(col)
         except ValueError:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.warning(
                 "Could not convert field to timestamps: %s", col.name
             )
@@ -265,7 +265,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
             # for each returned record
             object_name = query_results[0]['attributes']['type']
 
-            self.logger.info("Coercing timestamps for: %s", object_name)
+            self.log.info("Coercing timestamps for: %s", object_name)
 
             schema = self.describe_object(object_name)
 
@@ -299,7 +299,7 @@ class SalesforceHook(BaseHook, LoggingMixin):
             # there are also a ton of newline objects
             # that mess up our ability to write to csv
             # we remove these newlines so that the output is a valid CSV format
-            self.logger.info("Cleaning data and writing to CSV")
+            self.log.info("Cleaning data and writing to CSV")
             possible_strings = df.columns[df.dtypes == "object"]
             df[possible_strings] = df[possible_strings].apply(
                 lambda x: x.str.replace("\r\n", "")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index aa16130..6973023 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -16,7 +16,7 @@ import subprocess
 
 from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SparkSqlHook(BaseHook, LoggingMixin):
@@ -121,7 +121,7 @@ class SparkSqlHook(BaseHook, LoggingMixin):
             connection_cmd += ["--queue", self._yarn_queue]
 
         connection_cmd += cmd
-        self.logger.debug("Spark-Sql cmd: %s", connection_cmd)
+        self.log.debug("Spark-Sql cmd: %s", connection_cmd)
 
         return connection_cmd
 
@@ -151,5 +151,5 @@ class SparkSqlHook(BaseHook, LoggingMixin):
 
     def kill(self):
         if self._sp and self._sp.poll() is None:
-            self.logger.info("Killing the Spark-Sql job")
+            self.log.info("Killing the Spark-Sql job")
             self._sp.kill()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index bdd1efe..7d59cd2 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -18,7 +18,7 @@ import re
 
 from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SparkSubmitHook(BaseHook, LoggingMixin):
@@ -123,7 +123,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             conn_data['spark_home'] = extra.get('spark-home', None)
             conn_data['spark_binary'] = extra.get('spark-binary', 'spark-submit')
         except AirflowException:
-            self.logger.debug(
+            self.log.debug(
                 "Could not load connection string %s, defaulting to %s",
                 self._conn_id, conn_data['master']
             )
@@ -192,7 +192,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         if self._application_args:
             connection_cmd += self._application_args
 
-        self.logger.debug("Spark-Submit cmd: %s", connection_cmd)
+        self.log.debug("Spark-Submit cmd: %s", connection_cmd)
 
         return connection_cmd
 
@@ -239,15 +239,15 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
                     self._yarn_application_id = match.groups()[0]
 
             # Pass to logging
-            self.logger.info(line)
+            self.log.info(line)
 
     def on_kill(self):
         if self._sp and self._sp.poll() is None:
-            self.logger.info('Sending kill signal to %s', self._connection['spark_binary'])
+            self.log.info('Sending kill signal to %s', self._connection['spark_binary'])
             self._sp.kill()
 
             if self._yarn_application_id:
-                self.logger.info('Killing application on YARN')
+                self.log.info('Killing application on YARN')
                 kill_cmd = "yarn application -kill {0}".format(self._yarn_application_id).split()
                 yarn_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-                self.logger.info("YARN killed with return code: %s", yarn_kill.wait())
+                self.log.info("YARN killed with return code: %s", yarn_kill.wait())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index 0584df4..5b00b15 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -20,7 +20,7 @@ import subprocess
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SqoopHook(BaseHook, LoggingMixin):
@@ -76,7 +76,7 @@ class SqoopHook(BaseHook, LoggingMixin):
             password_index = cmd.index('--password')
             cmd[password_index + 1] = 'MASKED'
         except ValueError:
-            self.logger.debug("No password in sqoop cmd")
+            self.log.debug("No password in sqoop cmd")
         return cmd
 
     def Popen(self, cmd, **kwargs):
@@ -87,18 +87,18 @@ class SqoopHook(BaseHook, LoggingMixin):
         :param kwargs: extra arguments to Popen (see subprocess.Popen)
         :return: handle to subprocess
         """
-        self.logger.info("Executing command: %s", ' '.join(cmd))
+        self.log.info("Executing command: %s", ' '.join(cmd))
         sp = subprocess.Popen(cmd,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.STDOUT,
                               **kwargs)
 
         for line in iter(sp.stdout):
-            self.logger.info(line.strip())
+            self.log.info(line.strip())
 
         sp.wait()
 
-        self.logger.info("Command exited with return code %s", sp.returncode)
+        self.log.info("Command exited with return code %s", sp.returncode)
 
         if sp.returncode:
             raise AirflowException("Sqoop command failed: %s", ' '.join(cmd))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/hooks/ssh_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py
index 3fe9146..b061fd7 100755
--- a/airflow/contrib/hooks/ssh_hook.py
+++ b/airflow/contrib/hooks/ssh_hook.py
@@ -23,7 +23,7 @@ import paramiko
 from contextlib import contextmanager
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SSHHook(BaseHook, LoggingMixin):
@@ -70,7 +70,7 @@ class SSHHook(BaseHook, LoggingMixin):
 
     def get_conn(self):
         if not self.client:
-            self.logger.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
+            self.log.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
             if self.ssh_conn_id is not None:
                 conn = self.get_connection(self.ssh_conn_id)
                 if self.username is None:
@@ -98,7 +98,7 @@ class SSHHook(BaseHook, LoggingMixin):
 
             # Auto detecting username values from system
             if not self.username:
-                self.logger.debug(
+                self.log.debug(
                     "username to ssh to host: %s is not specified for connection id"
                     " %s. Using system's default provided by getpass.getuser()",
                     self.remote_host, self.ssh_conn_id
@@ -142,17 +142,17 @@ class SSHHook(BaseHook, LoggingMixin):
 
                 self.client = client
             except paramiko.AuthenticationException as auth_error:
-                self.logger.error(
+                self.log.error(
                     "Auth failed while connecting to host: %s, error: %s",
                     self.remote_host, auth_error
                 )
             except paramiko.SSHException as ssh_error:
-                self.logger.error(
+                self.log.error(
                     "Failed connecting to host: %s, error: %s",
                     self.remote_host, ssh_error
                 )
             except Exception as error:
-                self.logger.error(
+                self.log.error(
                     "Error connecting to host: %s, error: %s",
                     self.remote_host, error
                 )
@@ -191,7 +191,7 @@ class SSHHook(BaseHook, LoggingMixin):
                           ]
 
         ssh_cmd += ssh_tunnel_cmd
-        self.logger.debug("Creating tunnel with cmd: %s", ssh_cmd)
+        self.log.debug("Creating tunnel with cmd: %s", ssh_cmd)
 
         proc = subprocess.Popen(ssh_cmd,
                                 stdin=subprocess.PIPE,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 37e4a97..a2ba824 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -87,7 +87,7 @@ class BigQueryOperator(BaseOperator):
         self.query_params = query_params
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.bql)
+        self.log.info('Executing: %s', self.bql)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                             delegate_to=self.delegate_to)
         conn = hook.get_conn()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_table_delete_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py
index 21de7cc..0f4ef50 100644
--- a/airflow/contrib/operators/bigquery_table_delete_operator.py
+++ b/airflow/contrib/operators/bigquery_table_delete_operator.py
@@ -53,7 +53,7 @@ class BigQueryTableDeleteOperator(BaseOperator):
         self.ignore_if_missing = ignore_if_missing
 
     def execute(self, context):
-        self.logger.info('Deleting: %s', self.deletion_dataset_table)
+        self.log.info('Deleting: %s', self.deletion_dataset_table)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                             delegate_to=self.delegate_to)
         conn = hook.get_conn()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_to_bigquery.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py
index 8e21270..2bc4a8b 100644
--- a/airflow/contrib/operators/bigquery_to_bigquery.py
+++ b/airflow/contrib/operators/bigquery_to_bigquery.py
@@ -68,7 +68,7 @@ class BigQueryToBigQueryOperator(BaseOperator):
         self.delegate_to = delegate_to
 
     def execute(self, context):
-        self.logger.info(
+        self.log.info(
             'Executing copy of %s into: %s',
             self.source_project_dataset_tables, self.destination_project_dataset_table
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/bigquery_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py
index 23a2029..800e7bd 100644
--- a/airflow/contrib/operators/bigquery_to_gcs.py
+++ b/airflow/contrib/operators/bigquery_to_gcs.py
@@ -79,9 +79,9 @@ class BigQueryToCloudStorageOperator(BaseOperator):
         self.delegate_to = delegate_to
 
     def execute(self, context):
-        self.logger.info('Executing extract of %s into: %s',
-                     self.source_project_dataset_table,
-                     self.destination_cloud_storage_uris)
+        self.log.info('Executing extract of %s into: %s',
+                      self.source_project_dataset_table,
+                      self.destination_cloud_storage_uris)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                             delegate_to=self.delegate_to)
         conn = hook.get_conn()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/databricks_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py
index 8773357..cffc4ff 100644
--- a/airflow/contrib/operators/databricks_operator.py
+++ b/airflow/contrib/operators/databricks_operator.py
@@ -214,7 +214,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
             raise AirflowException(msg)
 
     def _log_run_page_url(self, url):
-        self.logger.info('View run status, Spark UI, and logs at %s', url)
+        self.log.info('View run status, Spark UI, and logs at %s', url)
 
     def get_hook(self):
         return DatabricksHook(
@@ -225,13 +225,13 @@ class DatabricksSubmitRunOperator(BaseOperator):
         hook = self.get_hook()
         self.run_id = hook.submit_run(self.json)
         run_page_url = hook.get_run_page_url(self.run_id)
-        self.logger.info('Run submitted with run_id: %s', self.run_id)
+        self.log.info('Run submitted with run_id: %s', self.run_id)
         self._log_run_page_url(run_page_url)
         while True:
             run_state = hook.get_run_state(self.run_id)
             if run_state.is_terminal:
                 if run_state.is_successful:
-                    self.logger.info('%s completed successfully.', self.task_id)
+                    self.log.info('%s completed successfully.', self.task_id)
                     self._log_run_page_url(run_page_url)
                     return
                 else:
@@ -240,15 +240,15 @@ class DatabricksSubmitRunOperator(BaseOperator):
                         s=run_state)
                     raise AirflowException(error_message)
             else:
-                self.logger.info('%s in run state: %s', self.task_id, run_state)
+                self.log.info('%s in run state: %s', self.task_id, run_state)
                 self._log_run_page_url(run_page_url)
-                self.logger.info('Sleeping for %s seconds.', self.polling_period_seconds)
+                self.log.info('Sleeping for %s seconds.', self.polling_period_seconds)
                 time.sleep(self.polling_period_seconds)
 
     def on_kill(self):
         hook = self.get_hook()
         hook.cancel_run(self.run_id)
-        self.logger.info(
+        self.log.info(
             'Task: %s with run_id: %s was requested to be cancelled.',
             self.task_id, self.run_id
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index 3c22b60..bdb0335 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -177,12 +177,12 @@ class DataprocClusterCreateOperator(BaseOperator):
         while True:
             state = self._get_cluster_state(service)
             if state is None:
-                self.logger.info("No state for cluster '%s'", self.cluster_name)
+                self.log.info("No state for cluster '%s'", self.cluster_name)
                 time.sleep(15)
             else:
-                self.logger.info("State for cluster '%s' is %s", self.cluster_name, state)
+                self.log.info("State for cluster '%s' is %s", self.cluster_name, state)
                 if self._cluster_ready(state, service):
-                    self.logger.info(
+                    self.log.info(
                         "Cluster '%s' successfully created", self.cluster_name
                     )
                     return
@@ -264,7 +264,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         return cluster_data
 
     def execute(self, context):
-        self.logger.info('Creating cluster: %s', self.cluster_name)
+        self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataProcHook(
             gcp_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to
@@ -272,7 +272,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         service = hook.get_conn()
 
         if self._get_cluster(service):
-            self.logger.info(
+            self.log.info(
                 'Cluster %s already exists... Checking status...',
                 self.cluster_name
             )
@@ -290,7 +290,7 @@ class DataprocClusterCreateOperator(BaseOperator):
             # probably two cluster start commands at the same time
             time.sleep(10)
             if self._get_cluster(service):
-                self.logger.info(
+                self.log.info(
                     'Cluster {} already exists... Checking status...',
                     self.cluster_name
                  )
@@ -358,7 +358,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
             time.sleep(15)
 
     def execute(self, context):
-        self.logger.info('Deleting cluster: %s', self.cluster_name)
+        self.log.info('Deleting cluster: %s', self.cluster_name)
         hook = DataProcHook(
             gcp_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to
@@ -371,7 +371,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
             clusterName=self.cluster_name
         ).execute()
         operation_name = response['name']
-        self.logger.info("Cluster delete operation name: %s", operation_name)
+        self.log.info("Cluster delete operation name: %s", operation_name)
         self._wait_for_done(service, operation_name)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/datastore_export_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py
index 76415e1..51e1d06 100644
--- a/airflow/contrib/operators/datastore_export_operator.py
+++ b/airflow/contrib/operators/datastore_export_operator.py
@@ -78,7 +78,7 @@ class DatastoreExportOperator(BaseOperator):
         self.xcom_push = xcom_push
 
     def execute(self, context):
-        self.logger.info('Exporting data to Cloud Storage bucket ' + self.bucket)
+        self.log.info('Exporting data to Cloud Storage bucket ' + self.bucket)
 
         if self.overwrite_existing and self.namespace:
             gcs_hook = GoogleCloudStorageHook(self.cloud_storage_conn_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/datastore_import_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py
index 74bd940..d8c42e7 100644
--- a/airflow/contrib/operators/datastore_import_operator.py
+++ b/airflow/contrib/operators/datastore_import_operator.py
@@ -72,7 +72,7 @@ class DatastoreImportOperator(BaseOperator):
         self.xcom_push = xcom_push
 
     def execute(self, context):
-        self.logger.info('Importing data from Cloud Storage bucket %s', self.bucket)
+        self.log.info('Importing data from Cloud Storage bucket %s', self.bucket)
         ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to)
         result = ds_hook.import_from_storage_bucket(bucket=self.bucket,
                                                     file=self.file,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/ecs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py
index 0c75eaa..898a77a 100644
--- a/airflow/contrib/operators/ecs_operator.py
+++ b/airflow/contrib/operators/ecs_operator.py
@@ -56,11 +56,11 @@ class ECSOperator(BaseOperator):
         self.hook = self.get_hook()
 
     def execute(self, context):
-        self.logger.info(
+        self.log.info(
             'Running ECS Task - Task definition: %s - on cluster %s',
             self.task_definition,self.cluster
         )
-        self.logger.info('ECSOperator overrides: %s', self.overrides)
+        self.log.info('ECSOperator overrides: %s', self.overrides)
 
         self.client = self.hook.get_client_type(
             'ecs',
@@ -77,13 +77,13 @@ class ECSOperator(BaseOperator):
         failures = response['failures']
         if len(failures) > 0:
             raise AirflowException(response)
-        self.logger.info('ECS Task started: %s', response)
+        self.log.info('ECS Task started: %s', response)
 
         self.arn = response['tasks'][0]['taskArn']
         self._wait_for_task_ended()
 
         self._check_success_task()
-        self.logger.info('ECS Task has been successfully executed: %s', response)
+        self.log.info('ECS Task has been successfully executed: %s', response)
 
     def _wait_for_task_ended(self):
         waiter = self.client.get_waiter('tasks_stopped')
@@ -98,7 +98,7 @@ class ECSOperator(BaseOperator):
             cluster=self.cluster,
             tasks=[self.arn]
         )
-        self.logger.info('ECS Task stopped, check status: %s', response)
+        self.log.info('ECS Task stopped, check status: %s', response)
 
         if len(response.get('failures', [])) > 0:
             raise AirflowException(response)
@@ -124,4 +124,4 @@ class ECSOperator(BaseOperator):
             cluster=self.cluster,
             task=self.arn,
             reason='Task killed by the user')
-        self.logger.info(response)
+        self.log.info(response)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py
index dbf764e..227474e 100644
--- a/airflow/contrib/operators/emr_add_steps_operator.py
+++ b/airflow/contrib/operators/emr_add_steps_operator.py
@@ -48,11 +48,11 @@ class EmrAddStepsOperator(BaseOperator):
     def execute(self, context):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        self.logger.info('Adding steps to %s', self.job_flow_id)
+        self.log.info('Adding steps to %s', self.job_flow_id)
         response = emr.add_job_flow_steps(JobFlowId=self.job_flow_id, Steps=self.steps)
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
             raise AirflowException('Adding steps failed: %s' % response)
         else:
-            self.logger.info('Steps %s added to JobFlow', response['StepIds'])
+            self.log.info('Steps %s added to JobFlow', response['StepIds'])
             return response['StepIds']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py
index 4e40b17..2544adf 100644
--- a/airflow/contrib/operators/emr_create_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_create_job_flow_operator.py
@@ -50,7 +50,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
     def execute(self, context):
         emr = EmrHook(aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id)
 
-        self.logger.info(
+        self.log.info(
             'Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s',
             self.aws_conn_id, self.emr_conn_id
         )
@@ -59,5 +59,5 @@ class EmrCreateJobFlowOperator(BaseOperator):
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
             raise AirflowException('JobFlow creation failed: %s' % response)
         else:
-            self.logger.info('JobFlow with id %s created', response['JobFlowId'])
+            self.log.info('JobFlow with id %s created', response['JobFlowId'])
             return response['JobFlowId']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
index df641ad..ec29897 100644
--- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
@@ -43,10 +43,10 @@ class EmrTerminateJobFlowOperator(BaseOperator):
     def execute(self, context):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        self.logger.info('Terminating JobFlow %s', self.job_flow_id)
+        self.log.info('Terminating JobFlow %s', self.job_flow_id)
         response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
             raise AirflowException('JobFlow termination failed: %s' % response)
         else:
-            self.logger.info('JobFlow with id %s terminated', self.job_flow_id)
+            self.log.info('JobFlow with id %s terminated', self.job_flow_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/file_to_wasb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py
index 4519e1e..3478dd3 100644
--- a/airflow/contrib/operators/file_to_wasb.py
+++ b/airflow/contrib/operators/file_to_wasb.py
@@ -51,7 +51,7 @@ class FileToWasbOperator(BaseOperator):
     def execute(self, context):
         """Upload a file to Azure Blob Storage."""
         hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
-        self.logger.info(
+        self.log.info(
             'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals())
         )
         hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py
index ca7d716..e7640c8 100644
--- a/airflow/contrib/operators/fs_operator.py
+++ b/airflow/contrib/operators/fs_operator.py
@@ -48,7 +48,7 @@ class FileSensor(BaseSensorOperator):
         hook = FSHook(self.fs_conn_id)
         basepath = hook.get_path()
         full_path = "/".join([basepath, self.filepath])
-        self.logger.info('Poking for file {full_path}'.format(**locals()))
+        self.log.info('Poking for file {full_path}'.format(**locals()))
         try:
             files = [f for f in walk(full_path)]
         except:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/gcs_download_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py
index 27e85b7..53516b1 100644
--- a/airflow/contrib/operators/gcs_download_operator.py
+++ b/airflow/contrib/operators/gcs_download_operator.py
@@ -65,7 +65,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
         self.delegate_to = delegate_to
 
     def execute(self, context):
-        self.logger.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
+        self.log.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
         hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
                                       delegate_to=self.delegate_to)
         file_bytes = hook.download(self.bucket, self.object, self.filename)
@@ -74,4 +74,4 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
                 context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
             else:
                 raise RuntimeError('The size of the downloaded file is too large to push to XCom!')
-        self.logger.info(file_bytes)
+        self.log.info(file_bytes)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 01f53cc..730a3bc 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -189,7 +189,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
                 self.destination_project_dataset_table))
             row = cursor.fetchone()
             max_id = row[0] if row[0] else 0
-            self.logger.info(
+            self.log.info(
                 'Loaded BQ data with max %s.%s=%s',
                 self.destination_project_dataset_table, self.max_id_key, max_id
             )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/hipchat_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py
index 19c6d76..d82ad61 100644
--- a/airflow/contrib/operators/hipchat_operator.py
+++ b/airflow/contrib/operators/hipchat_operator.py
@@ -66,8 +66,8 @@ class HipChatAPIOperator(BaseOperator):
                                         'Authorization': 'Bearer %s' % self.token},
                                     data=self.body)
         if response.status_code >= 400:
-            self.logger.error('HipChat API call failed: %s %s',
-                          response.status_code, response.reason)
+            self.log.error('HipChat API call failed: %s %s',
+                           response.status_code, response.reason)
             raise AirflowException('HipChat API call failed: %s %s' %
                                    (response.status_code, response.reason))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/mlengine_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py
index fdbfede..4d8943b 100644
--- a/airflow/contrib/operators/mlengine_operator.py
+++ b/airflow/contrib/operators/mlengine_operator.py
@@ -22,9 +22,9 @@ from airflow.operators import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from apiclient import errors
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def _create_prediction_input(project_id,
@@ -225,7 +225,7 @@ class MLEngineBatchPredictionOperator(BaseOperator):
                 model_name, version_name, uri, max_worker_count,
                 runtime_version)
         except ValueError as e:
-            self.logger.error(
+            self.log.error(
                 'Cannot create batch prediction job request due to: %s',
                 e
             )
@@ -251,7 +251,7 @@ class MLEngineBatchPredictionOperator(BaseOperator):
             raise
 
         if finished_prediction_job['state'] != 'SUCCEEDED':
-            self.logger.error(
+            self.log.error(
                 'Batch prediction job failed: %s',
                 str(finished_prediction_job))
             raise RuntimeError(finished_prediction_job['errorMessage'])
@@ -538,8 +538,8 @@ class MLEngineTrainingOperator(BaseOperator):
         }
 
         if self._mode == 'DRY_RUN':
-            self.logger.info('In dry_run mode.')
-            self.logger.info('MLEngine Training job request is: {}'.format(training_request))
+            self.log.info('In dry_run mode.')
+            self.log.info('MLEngine Training job request is: {}'.format(training_request))
             return
 
         hook = MLEngineHook(
@@ -557,6 +557,6 @@ class MLEngineTrainingOperator(BaseOperator):
             raise
 
         if finished_training_job['state'] != 'SUCCEEDED':
-            self.logger.error('MLEngine training job failed: {}'.format(
+            self.log.error('MLEngine training job failed: {}'.format(
                 str(finished_training_job)))
             raise RuntimeError(finished_training_job['errorMessage'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index f7b3a5a..c8ebcd0 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -168,7 +168,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
                 'mode': field_mode,
             })
 
-        self.logger.info('Using schema for %s: %s', self.schema_filename, schema)
+        self.log.info('Using schema for %s: %s', self.schema_filename, schema)
         tmp_schema_file_handle = NamedTemporaryFile(mode='w', delete=True)
         json.dump(schema, tmp_schema_file_handle)
         return {self.schema_filename: tmp_schema_file_handle}