You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/13 07:37:33 UTC

[5/5] incubator-airflow git commit: [AIRFLOW-1582] Improve logging within Airflow

[AIRFLOW-1582] Improve logging within Airflow

Clean the way of logging within Airflow. Remove
the old logging.py and
move to the airflow.utils.log.* interface. Remove
setting the logging
outside of the settings/configuration code. Move
away from the string
format to logging_function(msg, *args).

Closes #2592 from Fokko/AIRFLOW-1582-Improve-
logging-structure


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a7a51890
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a7a51890
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a7a51890

Branch: refs/heads/master
Commit: a7a518902dcf1e7fd4bf477cf57cee691f181b29
Parents: 5de632e
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Wed Sep 13 09:36:58 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Sep 13 09:36:58 2017 +0200

----------------------------------------------------------------------
 UPDATING.md                                     |   4 +-
 airflow/__init__.py                             |  10 +-
 airflow/api/__init__.py                         |  12 +-
 airflow/api/auth/backend/kerberos_auth.py       |  14 +-
 airflow/api/common/experimental/get_task.py     |   4 -
 .../common/experimental/get_task_instance.py    |   4 -
 airflow/bin/airflow                             |   1 -
 airflow/bin/cli.py                              |  46 +--
 airflow/configuration.py                        |  25 +-
 .../auth/backends/github_enterprise_auth.py     |   5 +-
 airflow/contrib/auth/backends/google_auth.py    |  11 +-
 airflow/contrib/auth/backends/kerberos_auth.py  |   5 +-
 airflow/contrib/auth/backends/ldap_auth.py      |  32 +-
 airflow/contrib/auth/backends/password_auth.py  |  10 +-
 airflow/contrib/executors/mesos_executor.py     |  51 +--
 airflow/contrib/hooks/bigquery_hook.py          |  57 +--
 airflow/contrib/hooks/cloudant_hook.py          |  10 +-
 airflow/contrib/hooks/databricks_hook.py        |  15 +-
 airflow/contrib/hooks/datadog_hook.py           |  13 +-
 airflow/contrib/hooks/datastore_hook.py         |   3 +-
 airflow/contrib/hooks/ftp_hook.py               |  10 +-
 airflow/contrib/hooks/gcp_api_base_hook.py      |  13 +-
 airflow/contrib/hooks/gcp_dataflow_hook.py      |  29 +-
 airflow/contrib/hooks/gcp_dataproc_hook.py      |  28 +-
 airflow/contrib/hooks/gcp_mlengine_hook.py      |  48 ++-
 airflow/contrib/hooks/gcs_hook.py               |  10 +-
 airflow/contrib/hooks/jira_hook.py              |   9 +-
 airflow/contrib/hooks/qubole_hook.py            |  22 +-
 airflow/contrib/hooks/redis_hook.py             |  16 +-
 airflow/contrib/hooks/salesforce_hook.py        |  31 +-
 airflow/contrib/hooks/spark_sql_hook.py         |  10 +-
 airflow/contrib/hooks/spark_submit_hook.py      |  24 +-
 airflow/contrib/hooks/sqoop_hook.py             |  17 +-
 airflow/contrib/hooks/ssh_hook.py               |  34 +-
 airflow/contrib/operators/bigquery_operator.py  |   4 +-
 .../operators/bigquery_table_delete_operator.py |   4 +-
 .../contrib/operators/bigquery_to_bigquery.py   |   8 +-
 airflow/contrib/operators/bigquery_to_gcs.py    |   4 +-
 .../contrib/operators/databricks_operator.py    |  25 +-
 airflow/contrib/operators/dataproc_operator.py  |  30 +-
 .../operators/datastore_export_operator.py      |   5 +-
 .../operators/datastore_import_operator.py      |   6 +-
 airflow/contrib/operators/ecs_operator.py       |  24 +-
 .../contrib/operators/emr_add_steps_operator.py |   7 +-
 .../operators/emr_create_job_flow_operator.py   |  10 +-
 .../emr_terminate_job_flow_operator.py          |   7 +-
 airflow/contrib/operators/file_to_wasb.py       |  14 +-
 airflow/contrib/operators/fs_operator.py        |   4 +-
 .../contrib/operators/gcs_download_operator.py  |   6 +-
 airflow/contrib/operators/gcs_to_bq.py          |   8 +-
 airflow/contrib/operators/hipchat_operator.py   |   3 +-
 airflow/contrib/operators/mlengine_operator.py  |  30 +-
 .../operators/mlengine_prediction_summary.py    |   2 -
 airflow/contrib/operators/mysql_to_gcs.py       |   4 +-
 airflow/contrib/operators/sftp_operator.py      |   7 +-
 .../contrib/operators/spark_submit_operator.py  |   6 +-
 airflow/contrib/operators/ssh_operator.py       |   1 -
 airflow/contrib/operators/vertica_operator.py   |   5 +-
 airflow/contrib/operators/vertica_to_hive.py    |   5 +-
 airflow/contrib/sensors/bigquery_sensor.py      |   5 +-
 airflow/contrib/sensors/datadog_sensor.py       |   5 +-
 airflow/contrib/sensors/emr_base_sensor.py      |   9 +-
 airflow/contrib/sensors/emr_job_flow_sensor.py  |   6 +-
 airflow/contrib/sensors/emr_step_sensor.py      |   5 +-
 airflow/contrib/sensors/ftp_sensor.py           |   4 +-
 airflow/contrib/sensors/gcs_sensor.py           |   7 +-
 airflow/contrib/sensors/hdfs_sensors.py         |  12 +-
 airflow/contrib/sensors/jira_sensor.py          |  27 +-
 airflow/contrib/sensors/redis_key_sensor.py     |   4 -
 airflow/contrib/sensors/wasb_sensor.py          |  11 +-
 .../contrib/task_runner/cgroup_task_runner.py   |  49 +--
 airflow/executors/__init__.py                   |   8 +-
 airflow/executors/base_executor.py              |  18 +-
 airflow/executors/celery_executor.py            |  24 +-
 airflow/executors/dask_executor.py              |  10 +-
 airflow/executors/local_executor.py             |  11 +-
 airflow/executors/sequential_executor.py        |   4 +-
 airflow/hooks/S3_hook.py                        |  52 +--
 airflow/hooks/base_hook.py                      |   9 +-
 airflow/hooks/dbapi_hook.py                     |  33 +-
 airflow/hooks/druid_hook.py                     |  10 +-
 airflow/hooks/hive_hooks.py                     |  37 +-
 airflow/hooks/http_hook.py                      |   7 +-
 airflow/hooks/oracle_hook.py                    |   9 +-
 airflow/hooks/pig_hook.py                       |   6 +-
 airflow/hooks/presto_hook.py                    |   4 -
 airflow/hooks/webhdfs_hook.py                   |  17 +-
 airflow/hooks/zendesk_hook.py                   |  20 +-
 airflow/jobs.py                                 | 349 ++++++++++---------
 airflow/models.py                               | 211 +++++------
 airflow/operators/bash_operator.py              |  23 +-
 airflow/operators/check_operator.py             |  24 +-
 airflow/operators/dagrun_operator.py            |   6 +-
 airflow/operators/docker_operator.py            |  12 +-
 airflow/operators/generic_transfer.py           |  13 +-
 airflow/operators/hive_operator.py              |   4 +-
 airflow/operators/hive_stats_operator.py        |  10 +-
 airflow/operators/hive_to_druid.py              |  18 +-
 airflow/operators/hive_to_mysql.py              |  14 +-
 airflow/operators/hive_to_samba_operator.py     |   6 +-
 airflow/operators/http_operator.py              |   8 +-
 airflow/operators/jdbc_operator.py              |   7 +-
 airflow/operators/latest_only_operator.py       |  19 +-
 airflow/operators/mssql_operator.py             |   5 +-
 airflow/operators/mssql_to_hive.py              |   6 +-
 airflow/operators/mysql_operator.py             |   5 +-
 airflow/operators/mysql_to_hive.py              |   5 +-
 airflow/operators/oracle_operator.py            |   5 +-
 airflow/operators/pig_operator.py               |   4 +-
 airflow/operators/postgres_operator.py          |   5 +-
 airflow/operators/presto_to_mysql.py            |  12 +-
 airflow/operators/python_operator.py            |  24 +-
 airflow/operators/redshift_to_s3_operator.py    |  17 +-
 airflow/operators/s3_file_transform_operator.py |  30 +-
 airflow/operators/s3_to_hive_operator.py        |  39 +--
 airflow/operators/sensors.py                    |  49 +--
 airflow/operators/slack_operator.py             |   9 +-
 airflow/operators/sqlite_operator.py            |   5 +-
 airflow/plugins_manager.py                      |  11 +-
 airflow/security/kerberos.py                    |  25 +-
 airflow/settings.py                             |  17 +-
 airflow/task_runner/base_task_runner.py         |   9 +-
 airflow/utils/dag_processing.py                 |  55 +--
 airflow/utils/db.py                             |  11 +-
 airflow/utils/email.py                          |   8 +-
 airflow/utils/log/LoggingMixin.py               |  45 +++
 airflow/utils/log/file_task_handler.py          |  34 +-
 airflow/utils/log/gcs_task_handler.py           | 125 ++++++-
 airflow/utils/log/s3_task_handler.py            |  97 +++++-
 airflow/utils/logging.py                        | 252 -------------
 airflow/utils/timeout.py                        |  17 +-
 airflow/www/api/experimental/endpoints.py       |   6 +-
 airflow/www/app.py                              |  10 +-
 airflow/www/views.py                            |   9 +-
 setup.py                                        |  11 -
 tests/contrib/hooks/test_databricks_hook.py     |  15 +-
 .../contrib/operators/test_dataproc_operator.py |  59 ++--
 tests/core.py                                   |  16 +-
 tests/operators/sensors.py                      |  53 +--
 tests/utils/log/test_logging.py                 | 108 ++++++
 tests/utils/test_logging.py                     | 103 ------
 141 files changed, 1578 insertions(+), 1747 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 92ee4b4..cde7141 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -13,7 +13,9 @@ assists people when migrating to a new version.
   - No updates are required if you are using ftpHook, it will continue work as is.
 
 ### Logging update
-  Logs now are stored in the log folder as ``{dag_id}/{task_id}/{execution_date}/{try_number}.log``.
+Airflow's logging has been rewritten to uses Python’s builtin `logging` module to perform system logging. By extending classes with the existing `LoggingMixin`, all the logging will go through a central logger. The main benefit that this brings to us is the easy configuration of the logging through `default_airflow_logging.py` and the ability to use different handlers for logging.
+
+Logs now are stored in the log folder as `{dag_id}/{task_id}/{execution_date}/{try_number}.log`.
 
 ### New Features
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index 3daa6e2..8844eeb 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -21,9 +21,10 @@ in their PYTHONPATH. airflow_login should be based off the
 """
 from builtins import object
 from airflow import version
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
 __version__ = version.version
 
-import logging
 import sys
 
 from airflow import configuration as conf
@@ -40,13 +41,15 @@ login = None
 
 
 def load_login():
+    log = LoggingMixin().logger
+
     auth_backend = 'airflow.default_login'
     try:
         if conf.getboolean('webserver', 'AUTHENTICATE'):
             auth_backend = conf.get('webserver', 'auth_backend')
     except conf.AirflowConfigException:
         if conf.getboolean('webserver', 'AUTHENTICATE'):
-            logging.warning(
+            log.warning(
                 "auth_backend not found in webserver config reverting to "
                 "*deprecated*  behavior of importing airflow_login")
             auth_backend = "airflow_login"
@@ -55,7 +58,7 @@ def load_login():
         global login
         login = import_module(auth_backend)
     except ImportError as err:
-        logging.critical(
+        log.critical(
             "Cannot import authentication module %s. "
             "Please correct your authentication backend or disable authentication: %s",
             auth_backend, err
@@ -76,7 +79,6 @@ from airflow import operators
 from airflow import hooks
 from airflow import executors
 from airflow import macros
-from airflow import contrib
 
 operators._integrate_plugins()
 hooks._integrate_plugins()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/api/__init__.py b/airflow/api/__init__.py
index ae47abf..39edbed 100644
--- a/airflow/api/__init__.py
+++ b/airflow/api/__init__.py
@@ -13,14 +13,16 @@
 # limitations under the License.
 from __future__ import print_function
 
-import logging
-
 from airflow.exceptions import AirflowException
 from airflow import configuration as conf
 from importlib import import_module
 
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
 api_auth = None
 
+log = LoggingMixin().logger
+
 
 def load_auth():
     auth_backend = 'airflow.api.auth.backend.default'
@@ -33,6 +35,8 @@ def load_auth():
         global api_auth
         api_auth = import_module(auth_backend)
     except ImportError as err:
-        logging.critical("Cannot import {} for API authentication due to: {}"
-                         .format(auth_backend, err))
+        log.critical(
+            "Cannot import %s for API authentication due to: %s",
+            auth_backend, err
+        )
         raise AirflowException(err)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/auth/backend/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/api/auth/backend/kerberos_auth.py b/airflow/api/auth/backend/kerberos_auth.py
index d1c3b70..73a5aa2 100644
--- a/airflow/api/auth/backend/kerberos_auth.py
+++ b/airflow/api/auth/backend/kerberos_auth.py
@@ -23,10 +23,12 @@
 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
 from future.standard_library import install_aliases
+
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
 install_aliases()
 
 import kerberos
-import logging
 import os
 
 from airflow import configuration as conf
@@ -45,6 +47,8 @@ client_auth = HTTPKerberosAuth(service='airflow')
 
 _SERVICE_NAME = None
 
+log = LoggingMixin().logger
+
 
 def init_app(app):
     global _SERVICE_NAME
@@ -52,7 +56,7 @@ def init_app(app):
     hostname = app.config.get('SERVER_NAME')
     if not hostname:
         hostname = getfqdn()
-    logging.info("Kerberos: hostname {}".format(hostname))
+    log.info("Kerberos: hostname %s", hostname)
 
     service = 'airflow'
 
@@ -62,12 +66,12 @@ def init_app(app):
         os.environ['KRB5_KTNAME'] = conf.get('kerberos', 'keytab')
 
     try:
-        logging.info("Kerberos init: {} {}".format(service, hostname))
+        log.info("Kerberos init: %s %s", service, hostname)
         principal = kerberos.getServerPrincipalDetails(service, hostname)
     except kerberos.KrbError as err:
-        logging.warning("Kerberos: {}".format(err))
+        log.warning("Kerberos: %s", err)
     else:
-        logging.info("Kerberos API: server is {}".format(principal))
+        log.info("Kerberos API: server is %s", principal)
 
 
 def _unauthorized():

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/common/experimental/get_task.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/get_task.py b/airflow/api/common/experimental/get_task.py
index 39ab423..9023ad1 100644
--- a/airflow/api/common/experimental/get_task.py
+++ b/airflow/api/common/experimental/get_task.py
@@ -12,13 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-
 from airflow.exceptions import AirflowException
 from airflow.models import DagBag
 
-_log = logging.getLogger(__name__)
-
 
 def get_task(dag_id, task_id):
     """Return the task object identified by the given dag_id and task_id."""

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/api/common/experimental/get_task_instance.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/get_task_instance.py b/airflow/api/common/experimental/get_task_instance.py
index 4c50731..7ab5e6e 100644
--- a/airflow/api/common/experimental/get_task_instance.py
+++ b/airflow/api/common/experimental/get_task_instance.py
@@ -12,13 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-
 from airflow.exceptions import AirflowException
 from airflow.models import DagBag
 
-_log = logging.getLogger(__name__)
-
 
 def get_task_instance(dag_id, task_id, execution_date):
     """Return the task object identified by the given dag_id and task_id."""

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/bin/airflow
----------------------------------------------------------------------
diff --git a/airflow/bin/airflow b/airflow/bin/airflow
index 0598596..2c0024d 100755
--- a/airflow/bin/airflow
+++ b/airflow/bin/airflow
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import logging
 import os
 from airflow import configuration
 from airflow.bin.cli import CLIFactory

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index a0545c3..56f1855 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -53,6 +53,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
 
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
+from airflow.utils.log.LoggingMixin import LoggingMixin
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -63,6 +64,8 @@ api_module = import_module(conf.get('cli', 'api_client'))
 api_client = api_module.Client(api_base_url=conf.get('cli', 'endpoint_url'),
                                auth=api.api_auth.client_auth)
 
+log = LoggingMixin().logger
+
 
 def sigint_handler(sig, frame):
     sys.exit(0)
@@ -186,19 +189,21 @@ def trigger_dag(args):
     :param args:
     :return:
     """
+    log = LoggingMixin().logger
     try:
         message = api_client.trigger_dag(dag_id=args.dag_id,
                                          run_id=args.run_id,
                                          conf=args.conf,
                                          execution_date=args.exec_date)
     except IOError as err:
-        logging.error(err)
+        log.error(err)
         raise AirflowException(err)
-
-    logging.info(message)
+    log.info(message)
 
 
 def pool(args):
+    log = LoggingMixin().logger
+
     def _tabulate(pools):
         return "\n%s" % tabulate(pools, ['Pool', 'Slots', 'Description'],
                                  tablefmt="fancy_grid")
@@ -215,9 +220,9 @@ def pool(args):
         else:
             pools = api_client.get_pools()
     except (AirflowException, IOError) as err:
-        logging.error(err)
+        log.error(err)
     else:
-        logging.info(_tabulate(pools=pools))
+        log.info(_tabulate(pools=pools))
 
 
 def variables(args):
@@ -325,6 +330,8 @@ def run(args, dag=None):
     if dag:
         args.dag_id = dag.dag_id
 
+    log = LoggingMixin().logger
+
     # Load custom airflow config
     if args.cfg_path:
         with open(args.cfg_path, 'r') as conf_file:
@@ -343,7 +350,7 @@ def run(args, dag=None):
         dag = get_dag(args)
     elif not dag:
         session = settings.Session()
-        logging.info('Loading pickle id {args.pickle}'.format(args=args))
+        log.info('Loading pickle id {args.pickle}'.format(args=args))
         dag_pickle = session.query(
             DagPickle).filter(DagPickle.id == args.pickle).first()
         if not dag_pickle:
@@ -354,11 +361,11 @@ def run(args, dag=None):
     ti = TaskInstance(task, args.execution_date)
     ti.refresh_from_db()
 
-    logger = logging.getLogger('airflow.task')
+    log = logging.getLogger('airflow.task')
     if args.raw:
-        logger = logging.getLogger('airflow.task.raw')
+        log = logging.getLogger('airflow.task.raw')
 
-    for handler in logger.handlers:
+    for handler in log.handlers:
         try:
             handler.set_context(ti)
         except AttributeError:
@@ -367,7 +374,7 @@ def run(args, dag=None):
             pass
 
     hostname = socket.getfqdn()
-    logging.info("Running on host {}".format(hostname))
+    log.info("Running on host %s", hostname)
 
     if args.local:
         run_job = jobs.LocalTaskJob(
@@ -396,6 +403,7 @@ def run(args, dag=None):
                 session.add(pickle)
                 session.commit()
                 pickle_id = pickle.id
+                # TODO: This should be written to a log
                 print((
                           'Pickled dag {dag} '
                           'as pickle_id:{pickle_id}').format(**locals()))
@@ -427,7 +435,7 @@ def run(args, dag=None):
     # might subsequently read from the log to insert into S3 or
     # Google cloud storage. Explicitly close the handler is
     # needed in order to upload to remote storage services.
-    for handler in logger.handlers:
+    for handler in log.handlers:
         handler.flush()
         handler.close()
 
@@ -449,6 +457,7 @@ def task_failed_deps(args):
 
     dep_context = DepContext(deps=SCHEDULER_DEPS)
     failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context))
+    # TODO, Do we want to print or log this
     if failed_deps:
         print("Task instance dependencies not met:")
         for dep in failed_deps:
@@ -605,8 +614,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
 
     def start_refresh(gunicorn_master_proc):
         batch_size = conf.getint('webserver', 'worker_refresh_batch_size')
-        logging.debug('%s doing a refresh of %s workers',
-                      state, batch_size)
+        log.debug('%s doing a refresh of %s workers', state, batch_size)
         sys.stdout.flush()
         sys.stderr.flush()
 
@@ -628,14 +636,14 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
 
         # Whenever some workers are not ready, wait until all workers are ready
         if num_ready_workers_running < num_workers_running:
-            logging.debug('%s some workers are starting up, waiting...', state)
+            log.debug('%s some workers are starting up, waiting...', state)
             sys.stdout.flush()
             time.sleep(1)
 
         # Kill a worker gracefully by asking gunicorn to reduce number of workers
         elif num_workers_running > num_workers_expected:
             excess = num_workers_running - num_workers_expected
-            logging.debug('%s killing %s workers', state, excess)
+            log.debug('%s killing %s workers', state, excess)
 
             for _ in range(excess):
                 gunicorn_master_proc.send_signal(signal.SIGTTOU)
@@ -646,7 +654,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
         # Start a new worker by asking gunicorn to increase number of workers
         elif num_workers_running == num_workers_expected:
             refresh_interval = conf.getint('webserver', 'worker_refresh_interval')
-            logging.debug(
+            log.debug(
                 '%s sleeping for %ss starting doing a refresh...',
                 state, refresh_interval
             )
@@ -655,7 +663,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected):
 
         else:
             # num_ready_workers_running == num_workers_running < num_workers_expected
-            logging.error((
+            log.error((
                 "%s some workers seem to have died and gunicorn"
                 "did not restart them as expected"
             ), state)
@@ -770,7 +778,7 @@ def webserver(args):
                             gunicorn_master_proc_pid = int(f.read())
                             break
                     except IOError:
-                        logging.debug("Waiting for gunicorn's pid file to be created.")
+                        log.debug("Waiting for gunicorn's pid file to be created.")
                         time.sleep(0.1)
 
                 gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
@@ -896,8 +904,6 @@ def resetdb(args):
     if args.yes or input(
         "This will drop existing tables if they exist. "
         "Proceed? (y/n)").upper() == "Y":
-        logging.basicConfig(level=settings.LOGGING_LEVEL,
-                            format=settings.SIMPLE_LOG_FORMAT)
         db_utils.resetdb()
     else:
         print("Bail.")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 460d975..db196f9 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -19,7 +19,6 @@ from __future__ import unicode_literals
 
 import copy
 import errno
-import logging
 import os
 import six
 import subprocess
@@ -28,6 +27,9 @@ import shlex
 import sys
 
 from future import standard_library
+
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
 standard_library.install_aliases()
 
 from builtins import str
@@ -36,6 +38,8 @@ from six.moves import configparser
 
 from airflow.exceptions import AirflowConfigException
 
+log = LoggingMixin().logger
+
 # show Airflow's deprecation warnings
 warnings.filterwarnings(
     action='default', category=DeprecationWarning, module='airflow')
@@ -198,8 +202,9 @@ class AirflowConfigParser(ConfigParser):
             return option
 
         else:
-            logging.warning("section/key [{section}/{key}] not found "
-                            "in config".format(**locals()))
+            log.warning(
+                "section/key [{section}/{key}] not found in config".format(**locals())
+            )
 
             raise AirflowConfigException(
                 "section/key [{section}/{key}] not found "
@@ -366,20 +371,22 @@ else:
 TEMPLATE_START = (
     '# ----------------------- TEMPLATE BEGINS HERE -----------------------')
 if not os.path.isfile(TEST_CONFIG_FILE):
-    logging.info(
-        'Creating new Airflow config file for unit tests in: {}'.format(
-            TEST_CONFIG_FILE))
+    log.info(
+        'Creating new Airflow config file for unit tests in: %s', TEST_CONFIG_FILE
+    )
     with open(TEST_CONFIG_FILE, 'w') as f:
         cfg = parameterized_config(TEST_CONFIG)
         f.write(cfg.split(TEMPLATE_START)[-1].strip())
 if not os.path.isfile(AIRFLOW_CONFIG):
-    logging.info('Creating new Airflow config file in: {}'.format(
-        AIRFLOW_CONFIG))
+    log.info(
+        'Creating new Airflow config file in: %s',
+        AIRFLOW_CONFIG
+    )
     with open(AIRFLOW_CONFIG, 'w') as f:
         cfg = parameterized_config(DEFAULT_CONFIG)
         f.write(cfg.split(TEMPLATE_START)[-1].strip())
 
-logging.info("Reading the config from " + AIRFLOW_CONFIG)
+log.info("Reading the config from %s", AIRFLOW_CONFIG)
 
 conf = AirflowConfigParser()
 conf.read(AIRFLOW_CONFIG)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/github_enterprise_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/github_enterprise_auth.py b/airflow/contrib/auth/backends/github_enterprise_auth.py
index 91126c7..459e9c9 100644
--- a/airflow/contrib/auth/backends/github_enterprise_auth.py
+++ b/airflow/contrib/auth/backends/github_enterprise_auth.py
@@ -11,8 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import logging
-
 import flask_login
 
 # Need to expose these downstream
@@ -29,8 +27,9 @@ from flask_oauthlib.client import OAuth
 
 from airflow import models, configuration, settings
 from airflow.configuration import AirflowConfigException
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
-_log = logging.getLogger(__name__)
+log = LoggingMixin().logger
 
 
 def get_config_param(param):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/google_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/google_auth.py b/airflow/contrib/auth/backends/google_auth.py
index 70c8e13..f38f725 100644
--- a/airflow/contrib/auth/backends/google_auth.py
+++ b/airflow/contrib/auth/backends/google_auth.py
@@ -11,8 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import logging
-
 import flask_login
 
 # Need to expose these downstream
@@ -28,9 +26,9 @@ from flask import url_for, redirect, request
 from flask_oauthlib.client import OAuth
 
 from airflow import models, configuration, settings
-from airflow.configuration import AirflowConfigException
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
-_log = logging.getLogger(__name__)
+log = LoggingMixin().logger
 
 
 def get_config_param(param):
@@ -106,7 +104,7 @@ class GoogleAuthBackend(object):
                                     self.oauth_callback)
 
     def login(self, request):
-        _log.debug('Redirecting user to Google login')
+        log.debug('Redirecting user to Google login')
         return self.google_oauth.authorize(callback=url_for(
             'google_oauth_callback',
             _external=True,
@@ -142,7 +140,7 @@ class GoogleAuthBackend(object):
         return GoogleUser(user)
 
     def oauth_callback(self):
-        _log.debug('Google OAuth callback called')
+        log.debug('Google OAuth callback called')
 
         next_url = request.args.get('next') or url_for('admin.index')
 
@@ -162,7 +160,6 @@ class GoogleAuthBackend(object):
                 return redirect(url_for('airflow.noaccess'))
 
         except AuthenticationError:
-            _log.exception('')
             return redirect(url_for('airflow.noaccess'))
 
         session = settings.Session()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/kerberos_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/kerberos_auth.py b/airflow/contrib/auth/backends/kerberos_auth.py
index e381059..ffb711f 100644
--- a/airflow/contrib/auth/backends/kerberos_auth.py
+++ b/airflow/contrib/auth/backends/kerberos_auth.py
@@ -29,8 +29,7 @@ from flask import url_for, redirect
 from airflow import settings
 from airflow import models
 from airflow import configuration
-
-import logging
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 login_manager = flask_login.LoginManager()
 login_manager.login_view = 'airflow.login'  # Calls login() below
@@ -41,7 +40,7 @@ class AuthenticationError(Exception):
     pass
 
 
-class KerberosUser(models.User):
+class KerberosUser(models.User, LoggingMixin):
     def __init__(self, user):
         self.user = user
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/ldap_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/ldap_auth.py b/airflow/contrib/auth/backends/ldap_auth.py
index 341f710..8ce0875 100644
--- a/airflow/contrib/auth/backends/ldap_auth.py
+++ b/airflow/contrib/auth/backends/ldap_auth.py
@@ -30,16 +30,16 @@ from airflow import models
 from airflow import configuration
 from airflow.configuration import AirflowConfigException
 
-import logging
-
 import traceback
 import re
 
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
 login_manager = flask_login.LoginManager()
 login_manager.login_view = 'airflow.login'  # Calls login() below
 login_manager.login_message = None
 
-LOG = logging.getLogger(__name__)
+log = LoggingMixin().logger
 
 
 class AuthenticationError(Exception):
@@ -64,7 +64,7 @@ def get_ldap_connection(dn=None, password=None):
     conn = Connection(server, native(dn), native(password))
 
     if not conn.bind():
-        LOG.error("Cannot bind to ldap server: %s ", conn.last_error)
+        log.error("Cannot bind to ldap server: %s ", conn.last_error)
         raise AuthenticationError("Cannot bind to ldap server")
 
     return conn
@@ -74,7 +74,7 @@ def group_contains_user(conn, search_base, group_filter, user_name_attr, usernam
     search_filter = '(&({0}))'.format(group_filter)
     if not conn.search(native(search_base), native(search_filter),
                        attributes=[native(user_name_attr)]):
-        LOG.warning("Unable to find group for %s %s", search_base, search_filter)
+        log.warning("Unable to find group for %s %s", search_base, search_filter)
     else:
         for resp in conn.response:
             if (
@@ -95,11 +95,11 @@ def groups_user(conn, search_base, user_filter, user_name_att, username):
         memberof_attr = "memberOf"
     res = conn.search(native(search_base), native(search_filter), attributes=[native(memberof_attr)])
     if not res:
-        LOG.info("Cannot find user %s", username)
+        log.info("Cannot find user %s", username)
         raise AuthenticationError("Invalid username or password")
 
     if conn.response and memberof_attr not in conn.response[0]["attributes"]:
-        LOG.warning("""Missing attribute "%s" when looked-up in Ldap database.
+        log.warning("""Missing attribute "%s" when looked-up in Ldap database.
         The user does not seem to be a member of a group and therefore won't see any dag
         if the option filter_by_owner=True and owner_mode=ldapgroup are set""", memberof_attr)
         return []
@@ -111,7 +111,7 @@ def groups_user(conn, search_base, user_filter, user_name_att, username):
     try:
         groups_list = [regex.search(i).group(1) for i in user_groups]
     except IndexError:
-        LOG.warning("Parsing error when retrieving the user's group(s)."
+        log.warning("Parsing error when retrieving the user's group(s)."
                     " Check if the user belongs to at least one group"
                     " or if the user's groups name do not contain special characters")
 
@@ -134,7 +134,7 @@ class LdapUser(models.User):
                                                  user.username)
         except AirflowConfigException:
             self.superuser = True
-            LOG.debug("Missing configuration for superuser settings.  Skipping.")
+            log.debug("Missing configuration for superuser settings.  Skipping.")
 
         try:
             self.data_profiler = group_contains_user(conn,
@@ -144,7 +144,7 @@ class LdapUser(models.User):
                                                      user.username)
         except AirflowConfigException:
             self.data_profiler = True
-            LOG.debug("Missing configuration for dataprofiler settings. Skipping")
+            log.debug("Missing configuration for dataprofiler settings. Skipping")
 
         # Load the ldap group(s) a user belongs to
         try:
@@ -154,7 +154,7 @@ class LdapUser(models.User):
                                            configuration.get("ldap", "user_name_attr"),
                                            user.username)
         except AirflowConfigException:
-            LOG.debug("Missing configuration for ldap settings. Skipping")
+            log.debug("Missing configuration for ldap settings. Skipping")
 
     @staticmethod
     def try_login(username, password):
@@ -185,7 +185,7 @@ class LdapUser(models.User):
 
         # todo: use list or result?
         if not res:
-            LOG.info("Cannot find user %s", username)
+            log.info("Cannot find user %s", username)
             raise AuthenticationError("Invalid username or password")
 
         entry = conn.response[0]
@@ -200,14 +200,14 @@ class LdapUser(models.User):
         try:
             conn = get_ldap_connection(entry['dn'], password)
         except KeyError as e:
-            LOG.error("""
+            log.error("""
             Unable to parse LDAP structure. If you're using Active Directory and not specifying an OU, you must set search_scope=SUBTREE in airflow.cfg.
             %s
             """ % traceback.format_exc())
             raise LdapException("Could not parse LDAP structure. Try setting search_scope in airflow.cfg, or check logs")
 
         if not conn:
-            LOG.info("Password incorrect for user %s", username)
+            log.info("Password incorrect for user %s", username)
             raise AuthenticationError("Invalid username or password")
 
     def is_active(self):
@@ -237,7 +237,7 @@ class LdapUser(models.User):
 
 @login_manager.user_loader
 def load_user(userid):
-    LOG.debug("Loading user %s", userid)
+    log.debug("Loading user %s", userid)
     if not userid or userid == 'None':
         return None
 
@@ -270,7 +270,7 @@ def login(self, request):
 
     try:
         LdapUser.try_login(username, password)
-        LOG.info("User %s successfully authenticated", username)
+        log.info("User %s successfully authenticated", username)
 
         session = settings.Session()
         user = session.query(models.User).filter(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/auth/backends/password_auth.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/auth/backends/password_auth.py b/airflow/contrib/auth/backends/password_auth.py
index 000b986..3ad2a8b 100644
--- a/airflow/contrib/auth/backends/password_auth.py
+++ b/airflow/contrib/auth/backends/password_auth.py
@@ -32,15 +32,13 @@ from sqlalchemy.ext.hybrid import hybrid_property
 
 from airflow import settings
 from airflow import models
-from airflow import configuration
-
-import logging
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 login_manager = flask_login.LoginManager()
 login_manager.login_view = 'airflow.login'  # Calls login() below
 login_manager.login_message = None
 
-LOG = logging.getLogger(__name__)
+log = LoggingMixin().logger
 PY3 = version_info[0] == 3
 
 
@@ -94,7 +92,7 @@ class PasswordUser(models.User):
 
 @login_manager.user_loader
 def load_user(userid):
-    LOG.debug("Loading user %s", userid)
+    log.debug("Loading user %s", userid)
     if not userid or userid == 'None':
         return None
 
@@ -137,7 +135,7 @@ def login(self, request):
         if not user.authenticate(password):
             session.close()
             raise AuthenticationError()
-        LOG.info("User %s successfully authenticated", username)
+        log.info("User %s successfully authenticated", username)
 
         flask_login.login_user(user)
         session.commit()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/executors/mesos_executor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/executors/mesos_executor.py b/airflow/contrib/executors/mesos_executor.py
index 49788fc..19d72ed 100644
--- a/airflow/contrib/executors/mesos_executor.py
+++ b/airflow/contrib/executors/mesos_executor.py
@@ -13,9 +13,12 @@
 # limitations under the License.
 
 from future import standard_library
+
+from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.www.utils import LoginMixin
+
 standard_library.install_aliases()
 from builtins import str
-import logging
 from queue import Queue
 
 import mesos.interface
@@ -41,7 +44,7 @@ def get_framework_name():
 
 # AirflowMesosScheduler, implements Mesos Scheduler interface
 # To schedule airflow jobs on mesos
-class AirflowMesosScheduler(mesos.interface.Scheduler):
+class AirflowMesosScheduler(mesos.interface.Scheduler, LoggingMixin):
     """
     Airflow Mesos scheduler implements mesos scheduler interface
     to schedule airflow tasks on mesos.
@@ -49,7 +52,6 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
     'airflow run <dag_id> <task_instance_id> <start_date> --local -p=<pickle>'
     to run on a mesos slave.
     """
-
     def __init__(self,
                  task_queue,
                  result_queue,
@@ -63,7 +65,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
         self.task_key_map = {}
 
     def registered(self, driver, frameworkId, masterInfo):
-        logging.info("AirflowScheduler registered to mesos with framework ID %s", frameworkId.value)
+        self.logger.info("AirflowScheduler registered to Mesos with framework ID %s", frameworkId.value)
 
         if configuration.getboolean('mesos', 'CHECKPOINT') and configuration.get('mesos', 'FAILOVER_TIMEOUT'):
             # Import here to work around a circular import error
@@ -84,25 +86,25 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
             Session.remove()
 
     def reregistered(self, driver, masterInfo):
-        logging.info("AirflowScheduler re-registered to mesos")
+        self.logger.info("AirflowScheduler re-registered to mesos")
 
     def disconnected(self, driver):
-        logging.info("AirflowScheduler disconnected from mesos")
+        self.logger.info("AirflowScheduler disconnected from mesos")
 
     def offerRescinded(self, driver, offerId):
-        logging.info("AirflowScheduler offer %s rescinded", str(offerId))
+        self.logger.info("AirflowScheduler offer %s rescinded", str(offerId))
 
     def frameworkMessage(self, driver, executorId, slaveId, message):
-        logging.info("AirflowScheduler received framework message %s", message)
+        self.logger.info("AirflowScheduler received framework message %s", message)
 
     def executorLost(self, driver, executorId, slaveId, status):
-        logging.warning("AirflowScheduler executor %s lost", str(executorId))
+        self.logger.warning("AirflowScheduler executor %s lost", str(executorId))
 
     def slaveLost(self, driver, slaveId):
-        logging.warning("AirflowScheduler slave %s lost", str(slaveId))
+        self.logger.warning("AirflowScheduler slave %s lost", str(slaveId))
 
     def error(self, driver, message):
-        logging.error("AirflowScheduler driver aborted %s", message)
+        self.logger.error("AirflowScheduler driver aborted %s", message)
         raise AirflowException("AirflowScheduler driver aborted %s" % message)
 
     def resourceOffers(self, driver, offers):
@@ -116,7 +118,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
                 elif resource.name == "mem":
                     offerMem += resource.scalar.value
 
-            logging.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
+            self.logger.info("Received offer %s with cpus: %s and mem: %s", offer.id.value, offerCpus, offerMem)
 
             remainingCpus = offerCpus
             remainingMem = offerMem
@@ -129,7 +131,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
                 self.task_counter += 1
                 self.task_key_map[str(tid)] = key
 
-                logging.info("Launching task %d using offer %s", tid, offer.id.value)
+                self.logger.info("Launching task %d using offer %s", tid, offer.id.value)
 
                 task = mesos_pb2.TaskInfo()
                 task.task_id.value = str(tid)
@@ -159,15 +161,17 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
             driver.launchTasks(offer.id, tasks)
 
     def statusUpdate(self, driver, update):
-        logging.info("Task %s is in state %s, data %s",
-                     update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data))
+        self.logger.info(
+            "Task %s is in state %s, data %s",
+            update.task_id.value, mesos_pb2.TaskState.Name(update.state), str(update.data)
+        )
 
         try:
             key = self.task_key_map[update.task_id.value]
         except KeyError:
             # The map may not contain an item if the framework re-registered after a failover.
             # Discard these tasks.
-            logging.warning("Unrecognised task key %s" % update.task_id.value)
+            self.logger.warning("Unrecognised task key %s", update.task_id.value)
             return
 
         if update.state == mesos_pb2.TASK_FINISHED:
@@ -181,7 +185,7 @@ class AirflowMesosScheduler(mesos.interface.Scheduler):
             self.task_queue.task_done()
 
 
-class MesosExecutor(BaseExecutor):
+class MesosExecutor(BaseExecutor, LoginMixin):
     """
     MesosExecutor allows distributing the execution of task
     instances to multiple mesos workers.
@@ -192,7 +196,6 @@ class MesosExecutor(BaseExecutor):
     elastic distributed systems to easily be built and run effectively.
     See http://mesos.apache.org/
     """
-
     def start(self):
         self.task_queue = Queue()
         self.result_queue = Queue()
@@ -200,7 +203,7 @@ class MesosExecutor(BaseExecutor):
         framework.user = ''
 
         if not configuration.get('mesos', 'MASTER'):
-            logging.error("Expecting mesos master URL for mesos executor")
+            self.logger.error("Expecting mesos master URL for mesos executor")
             raise AirflowException("mesos.master not provided for mesos executor")
 
         master = configuration.get('mesos', 'MASTER')
@@ -236,17 +239,19 @@ class MesosExecutor(BaseExecutor):
         else:
             framework.checkpoint = False
 
-        logging.info('MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s',
-            master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint))
+        self.logger.info(
+            'MesosFramework master : %s, name : %s, cpu : %s, mem : %s, checkpoint : %s',
+            master, framework.name, str(task_cpu), str(task_memory), str(framework.checkpoint)
+        )
 
         implicit_acknowledgements = 1
 
         if configuration.getboolean('mesos', 'AUTHENTICATE'):
             if not configuration.get('mesos', 'DEFAULT_PRINCIPAL'):
-                logging.error("Expecting authentication principal in the environment")
+                self.logger.error("Expecting authentication principal in the environment")
                 raise AirflowException("mesos.default_principal not provided in authenticated mode")
             if not configuration.get('mesos', 'DEFAULT_SECRET'):
-                logging.error("Expecting authentication secret in the environment")
+                self.logger.error("Expecting authentication secret in the environment")
                 raise AirflowException("mesos.default_secret not provided in authenticated mode")
 
             credential = mesos_pb2.Credential()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index d2ce2b0..497fa28 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -18,7 +18,6 @@ This module contains a BigQuery Hook, as well as a very basic PEP 249
 implementation for BigQuery.
 """
 
-import logging
 import time
 
 from apiclient.discovery import build, HttpError
@@ -33,11 +32,10 @@ from past.builtins import basestring
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 from airflow.hooks.dbapi_hook import DbApiHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
-logging.getLogger("bigquery").setLevel(logging.INFO)
 
-
-class BigQueryHook(GoogleCloudBaseHook, DbApiHook):
+class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
     """
     Interact with BigQuery. This hook uses the Google Cloud Platform
     connection.
@@ -178,13 +176,12 @@ class BigQueryConnection(object):
             "BigQueryConnection does not have transactions")
 
 
-class BigQueryBaseCursor(object):
+class BigQueryBaseCursor(LoggingMixin):
     """
     The BigQuery base cursor contains helper methods to execute queries against
     BigQuery. The methods can be used directly by operators, in cases where a
     PEP 249 cursor isn't needed.
     """
-
     def __init__(self, service, project_id):
         self.service = service
         self.project_id = project_id
@@ -290,10 +287,12 @@ class BigQueryBaseCursor(object):
         :param print_header: Whether to print a header for a CSV file extract.
         :type print_header: boolean
         """
+
         source_project, source_dataset, source_table = \
             _split_tablename(table_input=source_project_dataset_table,
                              default_project_id=self.project_id,
                              var_name='source_project_dataset_table')
+
         configuration = {
             'extract': {
                 'sourceTable': {
@@ -500,7 +499,7 @@ class BigQueryBaseCursor(object):
                     "'WRITE_APPEND' or 'WRITE_TRUNCATE'."
                 )
             else:
-                logging.info(
+                self.logger.info(
                     "Adding experimental "
                     "'schemaUpdateOptions': {0}".format(schema_update_options)
                 )
@@ -577,12 +576,12 @@ class BigQueryBaseCursor(object):
                             )
                         )
                 else:
-                    logging.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
+                    self.logger.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
                     time.sleep(5)
 
             except HttpError as err:
                 if err.resp.status in [500, 503]:
-                    logging.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
+                    self.logger.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
                     time.sleep(5)
                 else:
                     raise Exception(
@@ -661,14 +660,14 @@ class BigQueryBaseCursor(object):
                         datasetId=deletion_dataset,
                         tableId=deletion_table) \
                 .execute()
-            logging.info('Deleted table %s:%s.%s.',
+            self.logger.info('Deleted table %s:%s.%s.',
                          deletion_project, deletion_dataset, deletion_table)
         except HttpError:
             if not ignore_if_missing:
                 raise Exception(
                     'Table deletion failed. Table does not exist.')
             else:
-                logging.info('Table does not exist. Skipping.')
+                self.logger.info('Table does not exist. Skipping.')
 
 
     def run_table_upsert(self, dataset_id, table_resource, project_id=None):
@@ -695,8 +694,10 @@ class BigQueryBaseCursor(object):
             for table in tables_list_resp.get('tables', []):
                 if table['tableReference']['tableId'] == table_id:
                     # found the table, do update
-                    logging.info('table %s:%s.%s exists, updating.',
-                                 project_id, dataset_id, table_id)
+                    self.logger.info(
+                        'Table %s:%s.%s exists, updating.',
+                        project_id, dataset_id, table_id
+                    )
                     return self.service.tables().update(projectId=project_id,
                                                         datasetId=dataset_id,
                                                         tableId=table_id,
@@ -711,8 +712,10 @@ class BigQueryBaseCursor(object):
             # If there is no next page, then the table doesn't exist.
             else:
                 # do insert
-                logging.info('table %s:%s.%s does not exist. creating.',
-                             project_id, dataset_id, table_id)
+                self.logger.info(
+                    'Table %s:%s.%s does not exist. creating.',
+                    project_id, dataset_id, table_id
+                )
                 return self.service.tables().insert(projectId=project_id,
                                                     datasetId=dataset_id,
                                                     body=table_resource).execute()
@@ -756,18 +759,20 @@ class BigQueryBaseCursor(object):
                                 'tableId': view_table}}
         # check to see if the view we want to add already exists.
         if view_access not in access:
-            logging.info('granting table %s:%s.%s authorized view access to %s:%s dataset.',
-                         view_project, view_dataset, view_table,
-                         source_project, source_dataset)
+            self.logger.info(
+                'Granting table %s:%s.%s authorized view access to %s:%s dataset.',
+                view_project, view_dataset, view_table, source_project, source_dataset
+            )
             access.append(view_access)
             return self.service.datasets().patch(projectId=source_project,
                                                  datasetId=source_dataset,
                                                  body={'access': access}).execute()
         else:
             # if view is already in access, do nothing.
-            logging.info('table %s:%s.%s already has authorized view access to %s:%s dataset.',
-                         view_project, view_dataset, view_table,
-                         source_project, source_dataset)
+            self.logger.info(
+                'Table %s:%s.%s already has authorized view access to %s:%s dataset.',
+                view_project, view_dataset, view_table, source_project, source_dataset
+            )
             return source_dataset_resource
 
 
@@ -1027,10 +1032,12 @@ def _split_tablename(table_input, default_project_id, var_name=None):
 
     if project_id is None:
         if var_name is not None:
-            logging.info(
-                'project not included in {var}: '
-                '{input}; using project "{project}"'.format(
-                    var=var_name, input=table_input, project=default_project_id))
+            log = LoggingMixin().logger
+            log.info(
+                'Project not included in {var}: {input}; using project "{project}"'.format(
+                    var=var_name, input=table_input, project=default_project_id
+                )
+            )
         project_id = default_project_id
 
     return project_id, dataset_id, table_id

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/cloudant_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/cloudant_hook.py b/airflow/contrib/hooks/cloudant_hook.py
index 6cea26f..d9db08d 100644
--- a/airflow/contrib/hooks/cloudant_hook.py
+++ b/airflow/contrib/hooks/cloudant_hook.py
@@ -15,10 +15,10 @@
 from past.builtins import unicode
 
 import cloudant
-import logging
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 
 class CloudantHook(BaseHook):
@@ -35,9 +35,11 @@ class CloudantHook(BaseHook):
         def _str(s):
             # cloudant-python doesn't support unicode.
             if isinstance(s, unicode):
-                logging.debug(('cloudant-python does not support unicode. '
-                               'Encoding %s as ascii using "ignore".'),
-                              s)
+                log = LoggingMixin().logger
+                log.debug(
+                    'cloudant-python does not support unicode. Encoding %s as ascii using "ignore".',
+                    s
+                )
                 return s.encode('ascii', 'ignore')
 
             return s

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/databricks_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/databricks_hook.py b/airflow/contrib/hooks/databricks_hook.py
index 18e20c4..7b20433 100644
--- a/airflow/contrib/hooks/databricks_hook.py
+++ b/airflow/contrib/hooks/databricks_hook.py
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-import logging
 import requests
 
 from airflow import __version__
@@ -22,6 +20,7 @@ from airflow.hooks.base_hook import BaseHook
 from requests import exceptions as requests_exceptions
 from requests.auth import AuthBase
 
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 try:
     from urllib import parse as urlparse
@@ -35,7 +34,7 @@ CANCEL_RUN_ENDPOINT = ('POST', 'api/2.0/jobs/runs/cancel')
 USER_AGENT_HEADER = {'user-agent': 'airflow-{v}'.format(v=__version__)}
 
 
-class DatabricksHook(BaseHook):
+class DatabricksHook(BaseHook, LoggingMixin):
     """
     Interact with Databricks.
     """
@@ -101,10 +100,10 @@ class DatabricksHook(BaseHook):
             host=self._parse_host(self.databricks_conn.host),
             endpoint=endpoint)
         if 'token' in self.databricks_conn.extra_dejson:
-            logging.info('Using token auth.')
+            self.logger.info('Using token auth.')
             auth = _TokenAuth(self.databricks_conn.extra_dejson['token'])
         else:
-            logging.info('Using basic auth.')
+            self.logger.info('Using basic auth.')
             auth = (self.databricks_conn.login, self.databricks_conn.password)
         if method == 'GET':
             request_func = requests.get
@@ -130,8 +129,10 @@ class DatabricksHook(BaseHook):
                         response.content, response.status_code))
             except (requests_exceptions.ConnectionError,
                     requests_exceptions.Timeout) as e:
-                logging.error(('Attempt {0} API Request to Databricks failed ' +
-                              'with reason: {1}').format(attempt_num, e))
+                self.logger.error(
+                    'Attempt %s API Request to Databricks failed with reason: %s',
+                    attempt_num, e
+                )
         raise AirflowException(('API requests to Databricks failed {} times. ' +
                                'Giving up.').format(self.retry_limit))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/datadog_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datadog_hook.py b/airflow/contrib/hooks/datadog_hook.py
index 2125701..0f5af00 100644
--- a/airflow/contrib/hooks/datadog_hook.py
+++ b/airflow/contrib/hooks/datadog_hook.py
@@ -13,14 +13,14 @@
 # limitations under the License.
 
 import time
-import logging
-
 from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
 from datadog import initialize, api
 
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
 
-class DatadogHook(BaseHook):
+class DatadogHook(BaseHook, LoggingMixin):
     """
     Uses datadog API to send metrics of practically anything measurable,
     so it's possible to track # of db records inserted/deleted, records read
@@ -32,7 +32,6 @@ class DatadogHook(BaseHook):
     :param datadog_conn_id: The connection to datadog, containing metadata for api keys.
     :param datadog_conn_id: string
     """
-
     def __init__(self, datadog_conn_id='datadog_default'):
         conn = self.get_connection(datadog_conn_id)
         self.api_key = conn.extra_dejson.get('api_key', None)
@@ -48,7 +47,7 @@ class DatadogHook(BaseHook):
         if self.app_key is None:
             raise AirflowException("app_key must be specified in the Datadog connection details")
 
-        logging.info("Setting up api keys for datadog")
+        self.logger.info("Setting up api keys for Datadog")
         options = {
             'api_key': self.api_key,
             'app_key': self.app_key
@@ -57,8 +56,8 @@ class DatadogHook(BaseHook):
 
     def validate_response(self, response):
         if response['status'] != 'ok':
-            logging.error("Data dog returned: " + response)
-            raise AirflowException("Error status received from datadog")
+            self.logger.error("Datadog returned: %s", response)
+            raise AirflowException("Error status received from Datadog")
 
     def send_metric(self, metric_name, datapoint, tags=None):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/datastore_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/datastore_hook.py b/airflow/contrib/hooks/datastore_hook.py
index 7a4386a..2ff1600 100644
--- a/airflow/contrib/hooks/datastore_hook.py
+++ b/airflow/contrib/hooks/datastore_hook.py
@@ -15,7 +15,6 @@
 
 import json
 import time
-import logging
 from apiclient.discovery import build
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 
@@ -137,7 +136,7 @@ class DatastoreHook(GoogleCloudBaseHook):
             result = self.get_operation(name)
             state = result['metadata']['common']['state']
             if state == 'PROCESSING':
-                logging.info('Operation is processing. Re-polling state in {} seconds'
+                self.logger.info('Operation is processing. Re-polling state in {} seconds'
                         .format(polling_interval_in_seconds))
                 time.sleep(polling_interval_in_seconds)
             else:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/ftp_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ftp_hook.py b/airflow/contrib/hooks/ftp_hook.py
index 148811f..a6b3181 100644
--- a/airflow/contrib/hooks/ftp_hook.py
+++ b/airflow/contrib/hooks/ftp_hook.py
@@ -15,11 +15,12 @@
 
 import datetime
 import ftplib
-import logging
 import os.path
 from airflow.hooks.base_hook import BaseHook
 from past.builtins import basestring
 
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
 
 def mlsd(conn, path="", facts=None):
     """
@@ -54,7 +55,7 @@ def mlsd(conn, path="", facts=None):
         yield (name, entry)
 
 
-class FTPHook(BaseHook):
+class FTPHook(BaseHook, LoggingMixin):
     """
     Interact with FTP.
 
@@ -166,10 +167,9 @@ class FTPHook(BaseHook):
 
         remote_path, remote_file_name = os.path.split(remote_full_path)
         conn.cwd(remote_path)
-        logging.info('Retrieving file from FTP: {}'.format(remote_full_path))
+        self.logger.info('Retrieving file from FTP: %s', remote_full_path)
         conn.retrbinary('RETR %s' % remote_file_name, output_handle.write)
-        logging.info('Finished retrieving file from FTP: {}'.format(
-            remote_full_path))
+        self.logger.info('Finished retrieving file from FTP: %s', remote_full_path)
 
         if is_path:
             output_handle.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_api_base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py
index 48c5979..7476c90 100644
--- a/airflow/contrib/hooks/gcp_api_base_hook.py
+++ b/airflow/contrib/hooks/gcp_api_base_hook.py
@@ -12,18 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-import logging
-import json
-
 import httplib2
 from oauth2client.client import GoogleCredentials
 from oauth2client.service_account import ServiceAccountCredentials
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
-class GoogleCloudBaseHook(BaseHook):
+
+class GoogleCloudBaseHook(BaseHook, LoggingMixin):
     """
     A base hook for Google cloud-related hooks. Google cloud has a shared REST
     API client that is built in the same way no matter which service you use.
@@ -43,7 +41,6 @@ class GoogleCloudBaseHook(BaseHook):
 
     Legacy P12 key files are not supported.
     """
-
     def __init__(self, conn_id, delegate_to=None):
         """
         :param conn_id: The connection ID to use when fetching connection info.
@@ -69,7 +66,7 @@ class GoogleCloudBaseHook(BaseHook):
             kwargs['sub'] = self.delegate_to
 
         if not key_path:
-            logging.info('Getting connection using `gcloud auth` user, since no key file '
+            self.logger.info('Getting connection using `gcloud auth` user, since no key file '
                          'is defined for hook.')
             credentials = GoogleCredentials.get_application_default()
         else:
@@ -77,7 +74,7 @@ class GoogleCloudBaseHook(BaseHook):
                 raise AirflowException('Scope should be defined when using a key file.')
             scopes = [s.strip() for s in scope.split(',')]
             if key_path.endswith('.json'):
-                logging.info('Getting connection using a JSON key file.')
+                self.logger.info('Getting connection using a JSON key file.')
                 credentials = ServiceAccountCredentials\
                     .from_json_keyfile_name(key_path, scopes)
             elif key_path.endswith('.p12'):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_dataflow_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index fc73288..66dfb07 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -11,8 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
 import select
 import subprocess
 import time
@@ -21,10 +19,10 @@ import uuid
 from apiclient.discovery import build
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 
-class _DataflowJob(object):
-
+class _DataflowJob(LoggingMixin):
     def __init__(self, dataflow, project_number, name):
         self._dataflow = dataflow
         self._project_number = project_number
@@ -49,11 +47,15 @@ class _DataflowJob(object):
             job = self._dataflow.projects().jobs().get(projectId=self._project_number,
                                                        jobId=self._job_id).execute()
         if 'currentState' in job:
-            logging.info('Google Cloud DataFlow job %s is %s', job['name'],
-                         job['currentState'])
+            self.logger.info(
+                'Google Cloud DataFlow job %s is %s',
+                job['name'], job['currentState']
+            )
         else:
-            logging.info('Google Cloud DataFlow with job_id %s has name %s', self._job_id,
-                         job['name'])
+            self.logger.info(
+                'Google Cloud DataFlow with job_id %s has name %s',
+                self._job_id, job['name']
+            )
         return job
 
     def wait_for_done(self):
@@ -70,7 +72,7 @@ class _DataflowJob(object):
                 elif 'JOB_STATE_RUNNING' == self._job['currentState']:
                     time.sleep(10)
                 else:
-                    logging.debug(str(self._job))
+                    self.logger.debug(str(self._job))
                     raise Exception(
                         "Google Cloud Dataflow job {} was unknown state: {}".format(
                             self._job['name'], self._job['currentState']))
@@ -83,8 +85,7 @@ class _DataflowJob(object):
         return self._job
 
 
-class _Dataflow(object):
-
+class _Dataflow(LoggingMixin):
     def __init__(self, cmd):
         self._proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE,
                                       stderr=subprocess.PIPE)
@@ -105,15 +106,15 @@ class _Dataflow(object):
 
     def wait_for_done(self):
         reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
-        logging.info("Start waiting for DataFlow process to complete.")
+        self.logger.info("Start waiting for DataFlow process to complete.")
         while self._proc.poll() is None:
             ret = select.select(reads, [], [], 5)
             if ret is not None:
                 for fd in ret[0]:
                     line = self._line(fd)
-                    logging.debug(line[:-1])
+                    self.logger.debug(line[:-1])
             else:
-                logging.info("Waiting for DataFlow process to complete.")
+                self.logger.info("Waiting for DataFlow process to complete.")
         if self._proc.returncode is not 0:
             raise Exception("DataFlow failed with return code {}".format(
                 self._proc.returncode))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_dataproc_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py
index c1d8993..3a1336e 100644
--- a/airflow/contrib/hooks/gcp_dataproc_hook.py
+++ b/airflow/contrib/hooks/gcp_dataproc_hook.py
@@ -12,16 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import logging
 import time
 import uuid
 
 from apiclient.discovery import build
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 
-class _DataProcJob:
+class _DataProcJob(LoggingMixin):
     def __init__(self, dataproc_api, project_id, job):
         self.dataproc_api = dataproc_api
         self.project_id = project_id
@@ -30,8 +30,10 @@ class _DataProcJob:
             region='global',
             body=job).execute()
         self.job_id = self.job['reference']['jobId']
-        logging.info('DataProc job %s is %s', self.job_id,
-                     str(self.job['status']['state']))
+        self.logger.info(
+            'DataProc job %s is %s',
+            self.job_id, str(self.job['status']['state'])
+        )
 
     def wait_for_done(self):
         while True:
@@ -41,21 +43,23 @@ class _DataProcJob:
                 jobId=self.job_id).execute()
             if 'ERROR' == self.job['status']['state']:
                 print(str(self.job))
-                logging.error('DataProc job %s has errors', self.job_id)
-                logging.error(self.job['status']['details'])
-                logging.debug(str(self.job))
+                self.logger.error('DataProc job %s has errors', self.job_id)
+                self.logger.error(self.job['status']['details'])
+                self.logger.debug(str(self.job))
                 return False
             if 'CANCELLED' == self.job['status']['state']:
                 print(str(self.job))
-                logging.warning('DataProc job %s is cancelled', self.job_id)
+                self.logger.warning('DataProc job %s is cancelled', self.job_id)
                 if 'details' in self.job['status']:
-                    logging.warning(self.job['status']['details'])
-                logging.debug(str(self.job))
+                    self.logger.warning(self.job['status']['details'])
+                self.logger.debug(str(self.job))
                 return False
             if 'DONE' == self.job['status']['state']:
                 return True
-            logging.debug('DataProc job %s is %s', self.job_id,
-                          str(self.job['status']['state']))
+            self.logger.debug(
+                'DataProc job %s is %s',
+                self.job_id, str(self.job['status']['state'])
+            )
             time.sleep(5)
 
     def raise_error(self, message=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcp_mlengine_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcp_mlengine_hook.py b/airflow/contrib/hooks/gcp_mlengine_hook.py
index 47d9700..35f31a7 100644
--- a/airflow/contrib/hooks/gcp_mlengine_hook.py
+++ b/airflow/contrib/hooks/gcp_mlengine_hook.py
@@ -13,44 +13,40 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
-import logging
 import random
 import time
-from airflow import settings
-from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from apiclient.discovery import build
 from apiclient import errors
+from apiclient.discovery import build
 from oauth2client.client import GoogleCredentials
 
-logging.getLogger('GoogleCloudMLEngine').setLevel(settings.LOGGING_LEVEL)
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 
 def _poll_with_exponential_delay(request, max_n, is_done_func, is_error_func):
+    log = LoggingMixin().logger
 
     for i in range(0, max_n):
         try:
             response = request.execute()
             if is_error_func(response):
                 raise ValueError(
-                    'The response contained an error: {}'.format(response))
+                    'The response contained an error: {}'.format(response)
+                )
             elif is_done_func(response):
-                logging.info('Operation is done: {}'.format(response))
+                log.info('Operation is done: %s', response)
                 return response
             else:
                 time.sleep((2**i) + (random.randint(0, 1000) / 1000))
         except errors.HttpError as e:
             if e.resp.status != 429:
-                logging.info(
-                    'Something went wrong. Not retrying: {}'.format(e))
+                log.info('Something went wrong. Not retrying: %s', format(e))
                 raise
             else:
                 time.sleep((2**i) + (random.randint(0, 1000) / 1000))
 
 
 class MLEngineHook(GoogleCloudBaseHook):
-
     def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
         super(MLEngineHook, self).__init__(gcp_conn_id, delegate_to)
         self._mlengine = self.get_conn()
@@ -107,17 +103,20 @@ class MLEngineHook(GoogleCloudBaseHook):
                 if use_existing_job_fn is not None:
                     existing_job = self._get_job(project_id, job_id)
                     if not use_existing_job_fn(existing_job):
-                        logging.error(
-                            'Job with job_id {} already exist, but it does '
-                            'not match our expectation: {}'.format(
-                                job_id, existing_job))
+                        self.logger.error(
+                            'Job with job_id %s already exist, but it does '
+                            'not match our expectation: %s',
+                            job_id, existing_job
+                        )
                         raise
-                logging.info(
-                    'Job with job_id {} already exist. Will waiting for it to '
-                    'finish'.format(job_id))
+                self.logger.info(
+                    'Job with job_id %s already exist. Will waiting for it to finish',
+                    job_id
+                )
             else:
-                logging.error('Failed to create MLEngine job: {}'.format(e))
+                self.logger.error('Failed to create MLEngine job: {}'.format(e))
                 raise
+
         return self._wait_for_job_done(project_id, job_id)
 
     def _get_job(self, project_id, job_id):
@@ -140,7 +139,7 @@ class MLEngineHook(GoogleCloudBaseHook):
                     # polling after 30 seconds when quota failure occurs
                     time.sleep(30)
                 else:
-                    logging.error('Failed to get MLEngine job: {}'.format(e))
+                    self.logger.error('Failed to get MLEngine job: {}'.format(e))
                     raise
 
     def _wait_for_job_done(self, project_id, job_id, interval=30):
@@ -192,11 +191,10 @@ class MLEngineHook(GoogleCloudBaseHook):
 
         try:
             response = request.execute()
-            logging.info(
-                'Successfully set version: {} to default'.format(response))
+            self.logger.info('Successfully set version: %s to default', response)
             return response
         except errors.HttpError as e:
-            logging.error('Something went wrong: {}'.format(e))
+            self.logger.error('Something went wrong: %s', e)
             raise
 
     def list_versions(self, project_id, model_name):
@@ -264,6 +262,6 @@ class MLEngineHook(GoogleCloudBaseHook):
             return request.execute()
         except errors.HttpError as e:
             if e.resp.status == 404:
-                logging.error('Model was not found: {}'.format(e))
+                self.logger.error('Model was not found: %s', e)
                 return None
             raise

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/gcs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index b5f3edc..eb17c3b 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -12,17 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-import logging
-
 from apiclient.discovery import build
 from apiclient.http import MediaFileUpload
 from googleapiclient import errors
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 
-logging.getLogger("google_cloud_storage").setLevel(logging.INFO)
-
 
 class GoogleCloudStorageHook(GoogleCloudBaseHook):
     """
@@ -187,8 +182,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
                     ts = ts.replace(tzinfo=dateutil.tz.tzutc())
 
                 updated = dateutil.parser.parse(response['updated'])
-                logging.log(logging.INFO, "Verify object date: " + str(updated)
-                            + " > " + str(ts))
+                self.logger.info("Verify object date: %s > %s", updated, ts)
 
                 if updated > ts:
                     return True
@@ -253,7 +247,7 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
             ).execute()
 
             if 'items' not in response:
-                logging.info("No items found for prefix:{}".format(prefix))
+                self.logger.info("No items found for prefix: %s", prefix)
                 break
 
             for item in response['items']:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/jira_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/jira_hook.py b/airflow/contrib/hooks/jira_hook.py
index 148101b..8702608 100644
--- a/airflow/contrib/hooks/jira_hook.py
+++ b/airflow/contrib/hooks/jira_hook.py
@@ -11,24 +11,21 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from jira import JIRA
 from jira.exceptions import JIRAError
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 
-class JiraHook(BaseHook):
+class JiraHook(BaseHook, LoggingMixin):
     """
     Jira interaction hook, a Wrapper around JIRA Python SDK.
 
     :param jira_conn_id: reference to a pre-defined Jira Connection
     :type jira_conn_id: string
     """
-
     def __init__(self,
                  jira_conn_id='jira_default'):
         super(JiraHook, self).__init__(jira_conn_id)
@@ -38,7 +35,7 @@ class JiraHook(BaseHook):
 
     def get_conn(self):
         if not self.client:
-            logging.debug('creating jira client for conn_id: {0}'.format(self.jira_conn_id))
+            self.logger.debug('Creating Jira client for conn_id: %s', self.jira_conn_id)
 
             get_server_info = True
             validate = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index c51a757..1a5e7ec 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -16,12 +16,12 @@
 import os
 import time
 import datetime
-import logging
 import six
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow import configuration
+from airflow.utils.log.LoggingMixin import LoggingMixin
 from airflow.utils.state import State
 
 from qds_sdk.qubole import Qubole
@@ -68,7 +68,7 @@ COMMAND_ARGS = {
 }
 
 
-class QuboleHook(BaseHook):
+class QuboleHook(BaseHook, LoggingMixin):
     def __init__(self, *args, **kwargs):
         conn = self.get_connection(kwargs['qubole_conn_id'])
         Qubole.configure(api_token=conn.password, api_url=conn.host)
@@ -84,31 +84,33 @@ class QuboleHook(BaseHook):
         cmd_id = ti.xcom_pull(key='qbol_cmd_id', task_ids=ti.task_id)
 
         if cmd_id is not None:
-            logger = logging.getLogger("QuboleHook")
             cmd = Command.find(cmd_id)
             if cmd is not None:
+                log = LoggingMixin().logger
                 if cmd.status == 'done':
-                    logger.info('Command ID: %s has been succeeded, hence marking this '
+                    log.info('Command ID: %s has been succeeded, hence marking this '
                                 'TI as Success.', cmd_id)
                     ti.state = State.SUCCESS
                 elif cmd.status == 'running':
-                    logger.info('Cancelling the Qubole Command Id: %s', cmd_id)
+                    log.info('Cancelling the Qubole Command Id: %s', cmd_id)
                     cmd.cancel()
 
     def execute(self, context):
         args = self.cls.parse(self.create_cmd_args(context))
         self.cmd = self.cls.create(**args)
         context['task_instance'].xcom_push(key='qbol_cmd_id', value=self.cmd.id)
-        logging.info("Qubole command created with Id: %s and Status: %s",
-                     self.cmd.id, self.cmd.status)
+        self.logger.info(
+            "Qubole command created with Id: %s and Status: %s",
+            self.cmd.id, self.cmd.status
+        )
 
         while not Command.is_done(self.cmd.status):
             time.sleep(Qubole.poll_interval)
             self.cmd = self.cls.find(self.cmd.id)
-            logging.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
+            self.logger.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
 
         if 'fetch_logs' in self.kwargs and self.kwargs['fetch_logs'] is True:
-            logging.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
+            self.logger.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
 
         if self.cmd.status != 'done':
             raise AirflowException('Command Id: {0} failed with Status: {1}'.format(
@@ -124,7 +126,7 @@ class QuboleHook(BaseHook):
             cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=ti.task_id)
             self.cmd = self.cls.find(cmd_id)
         if self.cls and self.cmd:
-            logging.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
+            self.logger.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
             self.cmd.cancel()
 
     def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/redis_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/redis_hook.py b/airflow/contrib/hooks/redis_hook.py
index 936eff8..a8999d6 100644
--- a/airflow/contrib/hooks/redis_hook.py
+++ b/airflow/contrib/hooks/redis_hook.py
@@ -15,16 +15,14 @@
 """
 RedisHook module
 """
-
-import logging
-
 from redis import StrictRedis
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 
-class RedisHook(BaseHook):
+class RedisHook(BaseHook, LoggingMixin):
     """
     Hook to interact with Redis database
     """
@@ -42,7 +40,7 @@ class RedisHook(BaseHook):
         self.port = int(conn.port)
         self.password = conn.password
         self.db = int(conn.extra_dejson.get('db', 0))
-        self.logger = logging.getLogger(__name__)
+
         self.logger.debug(
             '''Connection "{conn}":
             \thost: {host}
@@ -62,11 +60,9 @@ class RedisHook(BaseHook):
         """
         if not self.client:
             self.logger.debug(
-                'generating Redis client for conn_id "{conn}" on '
-                '{host}:{port}:{db}'.format(conn=self.redis_conn_id,
-                                            host=self.host,
-                                            port=self.port,
-                                            db=self.db))
+                'generating Redis client for conn_id "%s" on %s:%s:%s',
+                self.redis_conn_id, self.host, self.port, self.db
+            )
             try:
                 self.client = StrictRedis(
                     host=self.host,