You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/13 07:37:32 UTC

[4/5] incubator-airflow git commit: [AIRFLOW-1582] Improve logging within Airflow

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/salesforce_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py
index 67d1605..f2b5fef 100644
--- a/airflow/contrib/hooks/salesforce_hook.py
+++ b/airflow/contrib/hooks/salesforce_hook.py
@@ -24,14 +24,15 @@ NOTE:   this hook also relies on the simple_salesforce package:
 from simple_salesforce import Salesforce
 from airflow.hooks.base_hook import BaseHook
 
-import logging
 import json
 
 import pandas as pd
 import time
 
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
-class SalesforceHook(BaseHook):
+
+class SalesforceHook(BaseHook, LoggingMixin):
     def __init__(
             self,
             conn_id,
@@ -91,13 +92,12 @@ class SalesforceHook(BaseHook):
         """
         self.sign_in()
 
-        logging.info("Querying for all objects")
+        self.logger.info("Querying for all objects")
         query = self.sf.query_all(query)
 
-        logging.info(
-            "Received results: Total size: {0}; Done: {1}".format(
-                query['totalSize'], query['done']
-            )
+        self.logger.info(
+            "Received results: Total size: %s; Done: %s",
+            query['totalSize'], query['done']
         )
 
         query = json.loads(json.dumps(query))
@@ -144,11 +144,9 @@ class SalesforceHook(BaseHook):
         field_string = self._build_field_list(fields)
 
         query = "SELECT {0} FROM {1}".format(field_string, obj)
-        logging.info(
-            "Making query to salesforce: {0}".format(
-                query if len(query) < 30
-                else " ... ".join([query[:15], query[-15:]])
-            )
+        self.logger.info(
+            "Making query to Salesforce: %s",
+            query if len(query) < 30 else " ... ".join([query[:15], query[-15:]])
         )
         return self.make_query(query)
 
@@ -171,8 +169,9 @@ class SalesforceHook(BaseHook):
         try:
             col = pd.to_datetime(col)
         except ValueError:
-            logging.warning(
-                "Could not convert field to timestamps: {0}".format(col.name)
+            log = LoggingMixin().logger
+            log.warning(
+                "Could not convert field to timestamps: %s", col.name
             )
             return col
 
@@ -266,7 +265,7 @@ class SalesforceHook(BaseHook):
             # for each returned record
             object_name = query_results[0]['attributes']['type']
 
-            logging.info("Coercing timestamps for: {0}".format(object_name))
+            self.logger.info("Coercing timestamps for: %s", object_name)
 
             schema = self.describe_object(object_name)
 
@@ -300,7 +299,7 @@ class SalesforceHook(BaseHook):
             # there are also a ton of newline objects
             # that mess up our ability to write to csv
             # we remove these newlines so that the output is a valid CSV format
-            logging.info("Cleaning data and writing to CSV")
+            self.logger.info("Cleaning data and writing to CSV")
             possible_strings = df.columns[df.dtypes == "object"]
             df[possible_strings] = df[possible_strings].apply(
                 lambda x: x.str.replace("\r\n", "")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index d7bef7b..aa16130 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -12,16 +12,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import logging
 import subprocess
 
 from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
-log = logging.getLogger(__name__)
 
-
-class SparkSqlHook(BaseHook):
+class SparkSqlHook(BaseHook, LoggingMixin):
     """
     This hook is a wrapper around the spark-sql binary. It requires that the
     "spark-sql" binary is in the PATH.
@@ -123,7 +121,7 @@ class SparkSqlHook(BaseHook):
             connection_cmd += ["--queue", self._yarn_queue]
 
         connection_cmd += cmd
-        logging.debug("Spark-Sql cmd: {}".format(connection_cmd))
+        self.logger.debug("Spark-Sql cmd: %s", connection_cmd)
 
         return connection_cmd
 
@@ -153,5 +151,5 @@ class SparkSqlHook(BaseHook):
 
     def kill(self):
         if self._sp and self._sp.poll() is None:
-            logging.info("Killing the Spark-Sql job")
+            self.logger.info("Killing the Spark-Sql job")
             self._sp.kill()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index a667753..bdd1efe 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -12,18 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import logging
 import os
 import subprocess
 import re
 
 from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
-log = logging.getLogger(__name__)
 
-
-class SparkSubmitHook(BaseHook):
+class SparkSubmitHook(BaseHook, LoggingMixin):
     """
     This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
     It requires that the "spark-submit" binary is in the PATH or the spark_home to be
@@ -63,7 +61,6 @@ class SparkSubmitHook(BaseHook):
     :param verbose: Whether to pass the verbose flag to spark-submit process for debugging
     :type verbose: bool
     """
-
     def __init__(self,
                  conf=None,
                  conn_id='spark_default',
@@ -126,10 +123,9 @@ class SparkSubmitHook(BaseHook):
             conn_data['spark_home'] = extra.get('spark-home', None)
             conn_data['spark_binary'] = extra.get('spark-binary', 'spark-submit')
         except AirflowException:
-            logging.debug(
-                "Could not load connection string {}, defaulting to {}".format(
-                    self._conn_id, conn_data['master']
-                )
+            self.logger.debug(
+                "Could not load connection string %s, defaulting to %s",
+                self._conn_id, conn_data['master']
             )
 
         return conn_data
@@ -196,7 +192,7 @@ class SparkSubmitHook(BaseHook):
         if self._application_args:
             connection_cmd += self._application_args
 
-        logging.debug("Spark-Submit cmd: {}".format(connection_cmd))
+        self.logger.debug("Spark-Submit cmd: %s", connection_cmd)
 
         return connection_cmd
 
@@ -243,15 +239,15 @@ class SparkSubmitHook(BaseHook):
                     self._yarn_application_id = match.groups()[0]
 
             # Pass to logging
-            logging.info(line)
+            self.logger.info(line)
 
     def on_kill(self):
         if self._sp and self._sp.poll() is None:
-            logging.info('Sending kill signal to {}'.format(self._connection['spark_binary']))
+            self.logger.info('Sending kill signal to %s', self._connection['spark_binary'])
             self._sp.kill()
 
             if self._yarn_application_id:
-                logging.info('Killing application on YARN')
+                self.logger.info('Killing application on YARN')
                 kill_cmd = "yarn application -kill {0}".format(self._yarn_application_id).split()
                 yarn_kill = subprocess.Popen(kill_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-                logging.info("YARN killed with return code: {0}".format(yarn_kill.wait()))
+                self.logger.info("YARN killed with return code: %s", yarn_kill.wait())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index 7fbb6c5..0584df4 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -16,17 +16,14 @@
 """
 This module contains a sqoop 1.x hook
 """
-
-import logging
 import subprocess
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
-
-log = logging.getLogger(__name__)
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 
-class SqoopHook(BaseHook):
+class SqoopHook(BaseHook, LoggingMixin):
     """
     This hook is a wrapper around the sqoop 1 binary. To be able to use the hook
     it is required that "sqoop" is in the PATH.
@@ -79,7 +76,7 @@ class SqoopHook(BaseHook):
             password_index = cmd.index('--password')
             cmd[password_index + 1] = 'MASKED'
         except ValueError:
-            logging.debug("No password in sqoop cmd")
+            self.logger.debug("No password in sqoop cmd")
         return cmd
 
     def Popen(self, cmd, **kwargs):
@@ -90,21 +87,21 @@ class SqoopHook(BaseHook):
         :param kwargs: extra arguments to Popen (see subprocess.Popen)
         :return: handle to subprocess
         """
-        logging.info("Executing command: {}".format(' '.join(cmd)))
+        self.logger.info("Executing command: %s", ' '.join(cmd))
         sp = subprocess.Popen(cmd,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.STDOUT,
                               **kwargs)
 
         for line in iter(sp.stdout):
-            logging.info(line.strip())
+            self.logger.info(line.strip())
 
         sp.wait()
 
-        logging.info("Command exited with return code {0}".format(sp.returncode))
+        self.logger.info("Command exited with return code %s", sp.returncode)
 
         if sp.returncode:
-            raise AirflowException("Sqoop command failed: {}".format(' '.join(cmd)))
+            raise AirflowException("Sqoop command failed: %s", ' '.join(cmd))
 
     def _prepare_command(self, export=False):
         if export:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/hooks/ssh_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py
index f1e25a6..3fe9146 100755
--- a/airflow/contrib/hooks/ssh_hook.py
+++ b/airflow/contrib/hooks/ssh_hook.py
@@ -16,7 +16,6 @@
 # limitations under the License.
 
 import getpass
-import logging
 import os
 
 import paramiko
@@ -24,9 +23,10 @@ import paramiko
 from contextlib import contextmanager
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 
-class SSHHook(BaseHook):
+class SSHHook(BaseHook, LoggingMixin):
     """
     Hook for ssh remote execution using Paramiko.
     ref: https://github.com/paramiko/paramiko
@@ -70,7 +70,7 @@ class SSHHook(BaseHook):
 
     def get_conn(self):
         if not self.client:
-            logging.debug('creating ssh client for conn_id: {0}'.format(self.ssh_conn_id))
+            self.logger.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
             if self.ssh_conn_id is not None:
                 conn = self.get_connection(self.ssh_conn_id)
                 if self.username is None:
@@ -98,9 +98,11 @@ class SSHHook(BaseHook):
 
             # Auto detecting username values from system
             if not self.username:
-                logging.debug("username to ssh to host: {0} is not specified, using "
-                             "system's default provided by getpass.getuser()"
-                             .format(self.remote_host, self.ssh_conn_id))
+                self.logger.debug(
+                    "username to ssh to host: %s is not specified for connection id"
+                    " %s. Using system's default provided by getpass.getuser()",
+                    self.remote_host, self.ssh_conn_id
+                )
                 self.username = getpass.getuser()
 
             host_proxy = None
@@ -140,14 +142,20 @@ class SSHHook(BaseHook):
 
                 self.client = client
             except paramiko.AuthenticationException as auth_error:
-                logging.error("Auth failed while connecting to host: {0}, error: {1}"
-                              .format(self.remote_host, auth_error))
+                self.logger.error(
+                    "Auth failed while connecting to host: %s, error: %s",
+                    self.remote_host, auth_error
+                )
             except paramiko.SSHException as ssh_error:
-                logging.error("Failed connecting to host: {0}, error: {1}"
-                              .format(self.remote_host, ssh_error))
+                self.logger.error(
+                    "Failed connecting to host: %s, error: %s",
+                    self.remote_host, ssh_error
+                )
             except Exception as error:
-                logging.error("Error connecting to host: {0}, error: {1}"
-                              .format(self.remote_host, error))
+                self.logger.error(
+                    "Error connecting to host: %s, error: %s",
+                    self.remote_host, error
+                )
         return self.client
 
     @contextmanager
@@ -183,7 +191,7 @@ class SSHHook(BaseHook):
                           ]
 
         ssh_cmd += ssh_tunnel_cmd
-        logging.debug("creating tunnel with cmd: {0}".format(ssh_cmd))
+        self.logger.debug("Creating tunnel with cmd: %s", ssh_cmd)
 
         proc = subprocess.Popen(ssh_cmd,
                                 stdin=subprocess.PIPE,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 3b804a8..37e4a97 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -89,7 +87,7 @@ class BigQueryOperator(BaseOperator):
         self.query_params = query_params
 
     def execute(self, context):
-        logging.info('Executing: %s', self.bql)
+        self.logger.info('Executing: %s', self.bql)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                             delegate_to=self.delegate_to)
         conn = hook.get_conn()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_table_delete_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py
index 643f5ac..21de7cc 100644
--- a/airflow/contrib/operators/bigquery_table_delete_operator.py
+++ b/airflow/contrib/operators/bigquery_table_delete_operator.py
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -55,7 +53,7 @@ class BigQueryTableDeleteOperator(BaseOperator):
         self.ignore_if_missing = ignore_if_missing
 
     def execute(self, context):
-        logging.info('Deleting: %s', self.deletion_dataset_table)
+        self.logger.info('Deleting: %s', self.deletion_dataset_table)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                             delegate_to=self.delegate_to)
         conn = hook.get_conn()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_to_bigquery.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py
index 6f4843c..8e21270 100644
--- a/airflow/contrib/operators/bigquery_to_bigquery.py
+++ b/airflow/contrib/operators/bigquery_to_bigquery.py
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -70,8 +68,10 @@ class BigQueryToBigQueryOperator(BaseOperator):
         self.delegate_to = delegate_to
 
     def execute(self, context):
-        logging.info('Executing copy of %s into: %s', self.source_project_dataset_tables,
-                     self.destination_project_dataset_table)
+        self.logger.info(
+            'Executing copy of %s into: %s',
+            self.source_project_dataset_tables, self.destination_project_dataset_table
+        )
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                             delegate_to=self.delegate_to)
         conn = hook.get_conn()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/bigquery_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py
index aaff462..23a2029 100644
--- a/airflow/contrib/operators/bigquery_to_gcs.py
+++ b/airflow/contrib/operators/bigquery_to_gcs.py
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -81,7 +79,7 @@ class BigQueryToCloudStorageOperator(BaseOperator):
         self.delegate_to = delegate_to
 
     def execute(self, context):
-        logging.info('Executing extract of %s into: %s',
+        self.logger.info('Executing extract of %s into: %s',
                      self.source_project_dataset_table,
                      self.destination_cloud_storage_uris)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/databricks_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py
index 1aa1441..8773357 100644
--- a/airflow/contrib/operators/databricks_operator.py
+++ b/airflow/contrib/operators/databricks_operator.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 #
 
-import logging
 import six
 import time
 
@@ -21,8 +20,6 @@ from airflow.exceptions import AirflowException
 from airflow.contrib.hooks.databricks_hook import DatabricksHook
 from airflow.models import BaseOperator
 
-LINE_BREAK = ('-' * 80)
-
 
 class DatabricksSubmitRunOperator(BaseOperator):
     """
@@ -217,7 +214,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
             raise AirflowException(msg)
 
     def _log_run_page_url(self, url):
-        logging.info('View run status, Spark UI, and logs at {}'.format(url))
+        self.logger.info('View run status, Spark UI, and logs at %s', url)
 
     def get_hook(self):
         return DatabricksHook(
@@ -228,16 +225,13 @@ class DatabricksSubmitRunOperator(BaseOperator):
         hook = self.get_hook()
         self.run_id = hook.submit_run(self.json)
         run_page_url = hook.get_run_page_url(self.run_id)
-        logging.info(LINE_BREAK)
-        logging.info('Run submitted with run_id: {}'.format(self.run_id))
+        self.logger.info('Run submitted with run_id: %s', self.run_id)
         self._log_run_page_url(run_page_url)
-        logging.info(LINE_BREAK)
         while True:
             run_state = hook.get_run_state(self.run_id)
             if run_state.is_terminal:
                 if run_state.is_successful:
-                    logging.info('{} completed successfully.'.format(
-                        self.task_id))
+                    self.logger.info('%s completed successfully.', self.task_id)
                     self._log_run_page_url(run_page_url)
                     return
                 else:
@@ -246,16 +240,15 @@ class DatabricksSubmitRunOperator(BaseOperator):
                         s=run_state)
                     raise AirflowException(error_message)
             else:
-                logging.info('{t} in run state: {s}'.format(t=self.task_id,
-                                                            s=run_state))
+                self.logger.info('%s in run state: %s', self.task_id, run_state)
                 self._log_run_page_url(run_page_url)
-                logging.info('Sleeping for {} seconds.'.format(
-                    self.polling_period_seconds))
+                self.logger.info('Sleeping for %s seconds.', self.polling_period_seconds)
                 time.sleep(self.polling_period_seconds)
 
     def on_kill(self):
         hook = self.get_hook()
         hook.cancel_run(self.run_id)
-        logging.info('Task: {t} with run_id: {r} was requested to be cancelled.'.format(
-            t=self.task_id,
-            r=self.run_id))
+        self.logger.info(
+            'Task: %s with run_id: %s was requested to be cancelled.',
+            self.task_id, self.run_id
+        )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/dataproc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py
index c0ff6a7..3c22b60 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 #
 
-import logging
 import time
 
 from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
@@ -178,13 +177,14 @@ class DataprocClusterCreateOperator(BaseOperator):
         while True:
             state = self._get_cluster_state(service)
             if state is None:
-                logging.info("No state for cluster '%s'", self.cluster_name)
+                self.logger.info("No state for cluster '%s'", self.cluster_name)
                 time.sleep(15)
             else:
-                logging.info("State for cluster '%s' is %s", self.cluster_name, state)
+                self.logger.info("State for cluster '%s' is %s", self.cluster_name, state)
                 if self._cluster_ready(state, service):
-                    logging.info("Cluster '%s' successfully created",
-                                 self.cluster_name)
+                    self.logger.info(
+                        "Cluster '%s' successfully created", self.cluster_name
+                    )
                     return
                 time.sleep(15)
 
@@ -264,7 +264,7 @@ class DataprocClusterCreateOperator(BaseOperator):
         return cluster_data
 
     def execute(self, context):
-        logging.info('Creating cluster: {}'.format(self.cluster_name))
+        self.logger.info('Creating cluster: %s', self.cluster_name)
         hook = DataProcHook(
             gcp_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to
@@ -272,9 +272,10 @@ class DataprocClusterCreateOperator(BaseOperator):
         service = hook.get_conn()
 
         if self._get_cluster(service):
-            logging.info('Cluster {} already exists... Checking status...'.format(
-                            self.cluster_name
-                        ))
+            self.logger.info(
+                'Cluster %s already exists... Checking status...',
+                self.cluster_name
+            )
             self._wait_for_done(service)
             return True
 
@@ -289,9 +290,10 @@ class DataprocClusterCreateOperator(BaseOperator):
             # probably two cluster start commands at the same time
             time.sleep(10)
             if self._get_cluster(service):
-                logging.info('Cluster {} already exists... Checking status...'.format(
-                             self.cluster_name
-                             ))
+                self.logger.info(
+                    'Cluster {} already exists... Checking status...',
+                    self.cluster_name
+                 )
                 self._wait_for_done(service)
                 return True
             else:
@@ -356,7 +358,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
             time.sleep(15)
 
     def execute(self, context):
-        logging.info('Deleting cluster: {}'.format(self.cluster_name))
+        self.logger.info('Deleting cluster: %s', self.cluster_name)
         hook = DataProcHook(
             gcp_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to
@@ -369,7 +371,7 @@ class DataprocClusterDeleteOperator(BaseOperator):
             clusterName=self.cluster_name
         ).execute()
         operation_name = response['name']
-        logging.info("Cluster delete operation name: {}".format(operation_name))
+        self.logger.info("Cluster delete operation name: %s", operation_name)
         self._wait_for_done(service, operation_name)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/datastore_export_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_export_operator.py b/airflow/contrib/operators/datastore_export_operator.py
index 1980dfe..76415e1 100644
--- a/airflow/contrib/operators/datastore_export_operator.py
+++ b/airflow/contrib/operators/datastore_export_operator.py
@@ -12,13 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-import logging
 from airflow.contrib.hooks.datastore_hook import DatastoreHook
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 
+
 class DatastoreExportOperator(BaseOperator):
     """
     Export entities from Google Cloud Datastore to Cloud Storage
@@ -79,7 +78,7 @@ class DatastoreExportOperator(BaseOperator):
         self.xcom_push = xcom_push
 
     def execute(self, context):
-        logging.info('Exporting data to Cloud Storage bucket ' + self.bucket)
+        self.logger.info('Exporting data to Cloud Storage bucket ' + self.bucket)
 
         if self.overwrite_existing and self.namespace:
             gcs_hook = GoogleCloudStorageHook(self.cloud_storage_conn_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/datastore_import_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/datastore_import_operator.py b/airflow/contrib/operators/datastore_import_operator.py
index 3427ba5..74bd940 100644
--- a/airflow/contrib/operators/datastore_import_operator.py
+++ b/airflow/contrib/operators/datastore_import_operator.py
@@ -12,13 +12,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-import logging
-
 from airflow.contrib.hooks.datastore_hook import DatastoreHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 
+
 class DatastoreImportOperator(BaseOperator):
     """
     Import entities from Cloud Storage to Google Cloud Datastore
@@ -74,7 +72,7 @@ class DatastoreImportOperator(BaseOperator):
         self.xcom_push = xcom_push
 
     def execute(self, context):
-        logging.info('Importing data from Cloud Storage bucket ' + self.bucket)
+        self.logger.info('Importing data from Cloud Storage bucket %s', self.bucket)
         ds_hook = DatastoreHook(self.datastore_conn_id, self.delegate_to)
         result = ds_hook.import_from_storage_bucket(bucket=self.bucket,
                                                     file=self.file,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/ecs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py
index 11f8c94..0c75eaa 100644
--- a/airflow/contrib/operators/ecs_operator.py
+++ b/airflow/contrib/operators/ecs_operator.py
@@ -12,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import sys
-import logging
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
@@ -57,12 +56,11 @@ class ECSOperator(BaseOperator):
         self.hook = self.get_hook()
 
     def execute(self, context):
-
-        logging.info('Running ECS Task - Task definition: {} - on cluster {}'.format(
-            self.task_definition,
-            self.cluster
-        ))
-        logging.info('ECSOperator overrides: {}'.format(self.overrides))
+        self.logger.info(
+            'Running ECS Task - Task definition: %s - on cluster %s',
+            self.task_definition,self.cluster
+        )
+        self.logger.info('ECSOperator overrides: %s', self.overrides)
 
         self.client = self.hook.get_client_type(
             'ecs',
@@ -77,15 +75,15 @@ class ECSOperator(BaseOperator):
         )
 
         failures = response['failures']
-        if (len(failures) > 0):
+        if len(failures) > 0:
             raise AirflowException(response)
-        logging.info('ECS Task started: {}'.format(response))
+        self.logger.info('ECS Task started: %s', response)
 
         self.arn = response['tasks'][0]['taskArn']
         self._wait_for_task_ended()
 
         self._check_success_task()
-        logging.info('ECS Task has been successfully executed: {}'.format(response))
+        self.logger.info('ECS Task has been successfully executed: %s', response)
 
     def _wait_for_task_ended(self):
         waiter = self.client.get_waiter('tasks_stopped')
@@ -100,9 +98,9 @@ class ECSOperator(BaseOperator):
             cluster=self.cluster,
             tasks=[self.arn]
         )
-        logging.info('ECS Task stopped, check status: {}'.format(response))
+        self.logger.info('ECS Task stopped, check status: %s', response)
 
-        if (len(response.get('failures', [])) > 0):
+        if len(response.get('failures', [])) > 0:
             raise AirflowException(response)
 
         for task in response['tasks']:
@@ -126,4 +124,4 @@ class ECSOperator(BaseOperator):
             cluster=self.cluster,
             task=self.arn,
             reason='Task killed by the user')
-        logging.info(response)
+        self.logger.info(response)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py
index 84ef2d0..dbf764e 100644
--- a/airflow/contrib/operators/emr_add_steps_operator.py
+++ b/airflow/contrib/operators/emr_add_steps_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.models import BaseOperator
 from airflow.utils import apply_defaults
 from airflow.exceptions import AirflowException
@@ -51,11 +48,11 @@ class EmrAddStepsOperator(BaseOperator):
     def execute(self, context):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        logging.info('Adding steps to %s', self.job_flow_id)
+        self.logger.info('Adding steps to %s', self.job_flow_id)
         response = emr.add_job_flow_steps(JobFlowId=self.job_flow_id, Steps=self.steps)
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
             raise AirflowException('Adding steps failed: %s' % response)
         else:
-            logging.info('Steps %s added to JobFlow', response['StepIds'])
+            self.logger.info('Steps %s added to JobFlow', response['StepIds'])
             return response['StepIds']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py
index 37d885d..4e40b17 100644
--- a/airflow/contrib/operators/emr_create_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_create_job_flow_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.contrib.hooks.emr_hook import EmrHook
 from airflow.models import BaseOperator
 from airflow.utils import apply_defaults
@@ -53,11 +50,14 @@ class EmrCreateJobFlowOperator(BaseOperator):
     def execute(self, context):
         emr = EmrHook(aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id)
 
-        logging.info('Creating JobFlow')
+        self.logger.info(
+            'Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s',
+            self.aws_conn_id, self.emr_conn_id
+        )
         response = emr.create_job_flow(self.job_flow_overrides)
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
             raise AirflowException('JobFlow creation failed: %s' % response)
         else:
-            logging.info('JobFlow with id %s created', response['JobFlowId'])
+            self.logger.info('JobFlow with id %s created', response['JobFlowId'])
             return response['JobFlowId']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
index 1b57276..df641ad 100644
--- a/airflow/contrib/operators/emr_terminate_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.models import BaseOperator
 from airflow.utils import apply_defaults
 from airflow.exceptions import AirflowException
@@ -46,10 +43,10 @@ class EmrTerminateJobFlowOperator(BaseOperator):
     def execute(self, context):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        logging.info('Terminating JobFlow %s', self.job_flow_id)
+        self.logger.info('Terminating JobFlow %s', self.job_flow_id)
         response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
             raise AirflowException('JobFlow termination failed: %s' % response)
         else:
-            logging.info('JobFlow with id %s terminated', self.job_flow_id)
+            self.logger.info('JobFlow with id %s terminated', self.job_flow_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/file_to_wasb.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/file_to_wasb.py b/airflow/contrib/operators/file_to_wasb.py
index 32e6b29..4519e1e 100644
--- a/airflow/contrib/operators/file_to_wasb.py
+++ b/airflow/contrib/operators/file_to_wasb.py
@@ -12,9 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-import logging
-
 from airflow.contrib.hooks.wasb_hook import WasbHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -23,7 +20,7 @@ from airflow.utils.decorators import apply_defaults
 class FileToWasbOperator(BaseOperator):
     """
     Uploads a file to Azure Blob Storage.
-    
+
     :param file_path: Path to the file to load.
     :type file_path: str
     :param container_name: Name of the container.
@@ -54,8 +51,7 @@ class FileToWasbOperator(BaseOperator):
     def execute(self, context):
         """Upload a file to Azure Blob Storage."""
         hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
-        logging.info(
-            'Uploading {self.file_path} to wasb://{self.container_name} as '
-            '{self.blob_name}'.format(**locals()))
-        hook.load_file(self.file_path, self.container_name, self.blob_name,
-                       **self.load_options)
+        self.logger.info(
+            'Uploading {self.file_path} to wasb://{self.container_name} as {self.blob_name}'.format(**locals())
+        )
+        hook.load_file(self.file_path, self.container_name, self.blob_name, **self.load_options)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/fs_operator.py b/airflow/contrib/operators/fs_operator.py
index 2596487..ca7d716 100644
--- a/airflow/contrib/operators/fs_operator.py
+++ b/airflow/contrib/operators/fs_operator.py
@@ -14,7 +14,6 @@
 #
 
 from os import walk
-import logging
 
 from airflow.operators.sensors import BaseSensorOperator
 from airflow.contrib.hooks.fs_hook import FSHook
@@ -49,8 +48,7 @@ class FileSensor(BaseSensorOperator):
         hook = FSHook(self.fs_conn_id)
         basepath = hook.get_path()
         full_path = "/".join([basepath, self.filepath])
-        logging.info(
-            'Poking for file {full_path} '.format(**locals()))
+        self.logger.info('Poking for file {full_path}'.format(**locals()))
         try:
             files = [f for f in walk(full_path)]
         except:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/gcs_download_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py
index c17f774..27e85b7 100644
--- a/airflow/contrib/operators/gcs_download_operator.py
+++ b/airflow/contrib/operators/gcs_download_operator.py
@@ -12,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
 import sys
 
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
@@ -48,7 +47,6 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
     template_fields = ('bucket', 'object', 'filename', 'store_to_xcom_key',)
     ui_color = '#f0eee4'
 
-    @apply_defaults
     def __init__(self,
                  bucket,
                  object,
@@ -67,7 +65,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
         self.delegate_to = delegate_to
 
     def execute(self, context):
-        logging.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
+        self.logger.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
         hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
                                       delegate_to=self.delegate_to)
         file_bytes = hook.download(self.bucket, self.object, self.filename)
@@ -76,4 +74,4 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
                 context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
             else:
                 raise RuntimeError('The size of the downloaded file is too large to push to XCom!')
-        logging.info(file_bytes)
+        self.logger.info(file_bytes)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index 39f0a48..01f53cc 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 
 import json
-import logging
 
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
@@ -190,7 +189,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
                 self.destination_project_dataset_table))
             row = cursor.fetchone()
             max_id = row[0] if row[0] else 0
-            logging.info('Loaded BQ data with max {}.{}={}'.format(
-                self.destination_project_dataset_table,
-                self.max_id_key, max_id))
+            self.logger.info(
+                'Loaded BQ data with max %s.%s=%s',
+                self.destination_project_dataset_table, self.max_id_key, max_id
+            )
             return max_id

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/hipchat_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/hipchat_operator.py b/airflow/contrib/operators/hipchat_operator.py
index aeb37d9..19c6d76 100644
--- a/airflow/contrib/operators/hipchat_operator.py
+++ b/airflow/contrib/operators/hipchat_operator.py
@@ -17,7 +17,6 @@ from builtins import str
 from airflow.utils.decorators import apply_defaults
 from airflow.models import BaseOperator
 from airflow.exceptions import AirflowException
-import logging
 import requests
 import json
 
@@ -67,7 +66,7 @@ class HipChatAPIOperator(BaseOperator):
                                         'Authorization': 'Bearer %s' % self.token},
                                     data=self.body)
         if response.status_code >= 400:
-            logging.error('HipChat API call failed: %s %s',
+            self.logger.error('HipChat API call failed: %s %s',
                           response.status_code, response.reason)
             raise AirflowException('HipChat API call failed: %s %s' %
                                    (response.status_code, response.reason))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/mlengine_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mlengine_operator.py b/airflow/contrib/operators/mlengine_operator.py
index 7476825..fdbfede 100644
--- a/airflow/contrib/operators/mlengine_operator.py
+++ b/airflow/contrib/operators/mlengine_operator.py
@@ -13,8 +13,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
 import re
 
 from airflow import settings
@@ -24,8 +22,9 @@ from airflow.operators import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from apiclient import errors
 
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
-logging.getLogger('GoogleCloudMLEngine').setLevel(settings.LOGGING_LEVEL)
+log = LoggingMixin().logger
 
 
 def _create_prediction_input(project_id,
@@ -52,7 +51,6 @@ def _create_prediction_input(project_id,
     Raises:
         ValueError: if a unique model/version origin cannot be determined.
     """
-
     prediction_input = {
         'dataFormat': data_format,
         'inputPaths': input_paths,
@@ -62,9 +60,9 @@ def _create_prediction_input(project_id,
 
     if uri:
         if model_name or version_name:
-            logging.error(
-                'Ambiguous model origin: Both uri and model/version name are '
-                'provided.')
+            log.error(
+                'Ambiguous model origin: Both uri and model/version name are provided.'
+            )
             raise ValueError('Ambiguous model origin.')
         prediction_input['uri'] = uri
     elif model_name:
@@ -75,7 +73,7 @@ def _create_prediction_input(project_id,
             prediction_input['versionName'] = \
                 origin_name + '/versions/{}'.format(version_name)
     else:
-        logging.error(
+        log.error(
             'Missing model origin: Batch prediction expects a model, '
             'a model & version combination, or a URI to savedModel.')
         raise ValueError('Missing model origin.')
@@ -227,9 +225,10 @@ class MLEngineBatchPredictionOperator(BaseOperator):
                 model_name, version_name, uri, max_worker_count,
                 runtime_version)
         except ValueError as e:
-            logging.error(
-                'Cannot create batch prediction job request due to: {}'
-                .format(str(e)))
+            self.logger.error(
+                'Cannot create batch prediction job request due to: %s',
+                e
+            )
             raise
 
         self.prediction_job_request = {
@@ -252,7 +251,7 @@ class MLEngineBatchPredictionOperator(BaseOperator):
             raise
 
         if finished_prediction_job['state'] != 'SUCCEEDED':
-            logging.error(
+            self.logger.error(
                 'Batch prediction job failed: %s',
                 str(finished_prediction_job))
             raise RuntimeError(finished_prediction_job['errorMessage'])
@@ -539,9 +538,8 @@ class MLEngineTrainingOperator(BaseOperator):
         }
 
         if self._mode == 'DRY_RUN':
-            logging.info('In dry_run mode.')
-            logging.info(
-                'MLEngine Training job request is: {}'.format(training_request))
+            self.logger.info('In dry_run mode.')
+            self.logger.info('MLEngine Training job request is: {}'.format(training_request))
             return
 
         hook = MLEngineHook(
@@ -559,6 +557,6 @@ class MLEngineTrainingOperator(BaseOperator):
             raise
 
         if finished_training_job['state'] != 'SUCCEEDED':
-            logging.error('MLEngine training job failed: {}'.format(
+            self.logger.error('MLEngine training job failed: {}'.format(
                 str(finished_training_job)))
             raise RuntimeError(finished_training_job['errorMessage'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/mlengine_prediction_summary.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mlengine_prediction_summary.py b/airflow/contrib/operators/mlengine_prediction_summary.py
index 1f4d540..17fc2c0 100644
--- a/airflow/contrib/operators/mlengine_prediction_summary.py
+++ b/airflow/contrib/operators/mlengine_prediction_summary.py
@@ -95,7 +95,6 @@ from __future__ import print_function
 import argparse
 import base64
 import json
-import logging
 import os
 
 import apache_beam as beam
@@ -173,5 +172,4 @@ def run(argv=None):
 
 
 if __name__ == "__main__":
-    logging.getLogger().setLevel(logging.INFO)
     run()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py
index 374567b..7e83bce 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -13,14 +13,12 @@
 # limitations under the License.
 
 import json
-import logging
 import time
 
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.hooks.mysql_hook import MySqlHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
-from collections import OrderedDict
 from datetime import date, datetime
 from decimal import Decimal
 from MySQLdb.constants import FIELD_TYPE
@@ -170,7 +168,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
                 'mode': field_mode,
             })
 
-        logging.info('Using schema for %s: %s', self.schema_filename, schema)
+        self.logger.info('Using schema for %s: %s', self.schema_filename, schema)
         tmp_schema_file_handle = NamedTemporaryFile(delete=True)
         json.dump(schema, tmp_schema_file_handle)
         return {self.schema_filename: tmp_schema_file_handle}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/sftp_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sftp_operator.py b/airflow/contrib/operators/sftp_operator.py
index b9f07d5..5abfc51 100644
--- a/airflow/contrib/operators/sftp_operator.py
+++ b/airflow/contrib/operators/sftp_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.contrib.hooks.ssh_hook import SSHHook
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
@@ -84,12 +81,12 @@ class SFTPOperator(BaseOperator):
             if self.operation.lower() == SFTPOperation.GET:
                 file_msg = "from {0} to {1}".format(self.remote_filepath,
                                                     self.local_filepath)
-                logging.debug("Starting to transfer {0}".format(file_msg))
+                self.logger.debug("Starting to transfer %s", file_msg)
                 sftp_client.get(self.remote_filepath, self.local_filepath)
             else:
                 file_msg = "from {0} to {1}".format(self.local_filepath,
                                                     self.remote_filepath)
-                logging.debug("Starting to transfer file {0}".format(file_msg))
+                self.logger.debug("Starting to transfer file %s", file_msg)
                 sftp_client.put(self.local_filepath, self.remote_filepath)
 
         except Exception as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py
index ca628e9..2aed4c6 100644
--- a/airflow/contrib/operators/spark_submit_operator.py
+++ b/airflow/contrib/operators/spark_submit_operator.py
@@ -12,14 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import logging
-
 from airflow.contrib.hooks.spark_submit_hook import SparkSubmitHook
 from airflow.models import BaseOperator
-from airflow.utils.decorators import apply_defaults
 from airflow.settings import WEB_COLORS
-
-log = logging.getLogger(__name__)
+from airflow.utils.decorators import apply_defaults
 
 
 class SparkSubmitOperator(BaseOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/ssh_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
index 2e03b96..897cd1a 100644
--- a/airflow/contrib/operators/ssh_operator.py
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 
 from base64 import b64encode
-import logging
 
 from airflow import configuration
 from airflow.contrib.hooks.ssh_hook import SSHHook

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/vertica_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py
index 9266563..fc9cf3b 100644
--- a/airflow/contrib/operators/vertica_operator.py
+++ b/airflow/contrib/operators/vertica_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.contrib.hooks.vertica_hook import VerticaHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -42,6 +39,6 @@ class VerticaOperator(BaseOperator):
         self.sql = sql
 
     def execute(self, context):
-        logging.info('Executing: ' + str(self.sql))
+        self.logger.info('Executing: %s', self.sql)
         hook = VerticaHook(vertica_conn_id=self.vertica_conn_id)
         hook.run(self.sql)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/operators/vertica_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py
index 31ce110..35ff3c6 100644
--- a/airflow/contrib/operators/vertica_to_hive.py
+++ b/airflow/contrib/operators/vertica_to_hive.py
@@ -15,7 +15,6 @@
 from builtins import chr
 from collections import OrderedDict
 import unicodecsv as csv
-import logging
 from tempfile import NamedTemporaryFile
 
 from airflow.hooks.hive_hooks import HiveCliHook
@@ -104,7 +103,7 @@ class VerticaToHiveTransfer(BaseOperator):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
 
-        logging.info("Dumping Vertica query results to local file")
+        self.logger.info("Dumping Vertica query results to local file")
         conn = vertica.get_conn()
         cursor = conn.cursor()
         cursor.execute(self.sql)
@@ -120,7 +119,7 @@ class VerticaToHiveTransfer(BaseOperator):
             f.flush()
             cursor.close()
             conn.close()
-            logging.info("Loading file into Hive")
+            self.logger.info("Loading file into Hive")
             hive.load_file(
                 f.name,
                 self.hive_table,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
index 8a8ca62..630cebe 100644
--- a/airflow/contrib/sensors/bigquery_sensor.py
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.operators.sensors import BaseSensorOperator
 from airflow.contrib.hooks.bigquery_hook import BigQueryHook
 from airflow.utils.decorators import apply_defaults
@@ -62,7 +59,7 @@ class BigQueryTableSensor(BaseSensorOperator):
 
     def poke(self, context):
         table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
-        logging.info('Sensor checks existence of table: %s', table_uri)
+        self.logger.info('Sensor checks existence of table: %s', table_uri)
         hook = BigQueryHook(
             bigquery_conn_id=self.bigquery_conn_id,
             delegate_to=self.delegate_to)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
index d8660f7..4ee45f9 100644
--- a/airflow/contrib/sensors/datadog_sensor.py
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.operators.sensors import BaseSensorOperator
 from airflow.contrib.hooks.datadog_hook import DatadogHook
 from airflow.utils import apply_defaults
@@ -70,7 +67,7 @@ class DatadogSensor(BaseSensorOperator):
             tags=self.tags)
 
         if isinstance(response, dict) and response.get('status', 'ok') != 'ok':
-            logging.error("Unexpected datadog result: %s" % (response))
+            self.logger.error("Unexpected Datadog result: %s", response)
             raise AirflowException("Datadog returned unexpected result")
 
         if self.response_check:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index 5526604..034fcb6 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.operators.sensors import BaseSensorOperator
 from airflow.utils import apply_defaults
 from airflow.exceptions import AirflowException
@@ -23,7 +20,7 @@ class EmrBaseSensor(BaseSensorOperator):
     """
     Contains general sensor behavior for EMR.
     Subclasses should implement get_emr_response() and state_from_response() methods.
-    Subclasses should also implment NON_TERMINAL_STATES and FAILED_STATE constants.
+    Subclasses should also implement NON_TERMINAL_STATES and FAILED_STATE constants.
     """
     ui_color = '#66c3ff'
 
@@ -39,11 +36,11 @@ class EmrBaseSensor(BaseSensorOperator):
         response = self.get_emr_response()
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
-            logging.info('Bad HTTP response: %s' % response)
+            self.logger.info('Bad HTTP response: %s', response)
             return False
 
         state = self.state_from_response(response)
-        logging.info('Job flow currently %s' % state)
+        self.logger.info('Job flow currently %s', state)
 
         if state in self.NON_TERMINAL_STATES:
             return False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index 662b3b8..e5610a1 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -11,10 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
-import logging
-
 from airflow.contrib.hooks.emr_hook import EmrHook
 from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
 from airflow.utils import apply_defaults
@@ -45,7 +41,7 @@ class EmrJobFlowSensor(EmrBaseSensor):
     def get_emr_response(self):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        logging.info('Poking cluster %s' % self.job_flow_id)
+        self.logger.info('Poking cluster %s', self.job_flow_id)
         return emr.describe_cluster(ClusterId=self.job_flow_id)
 
     def state_from_response(self, response):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index 4cc6bc4..e131d77 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.contrib.hooks.emr_hook import EmrHook
 from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
 from airflow.utils import apply_defaults
@@ -48,7 +45,7 @@ class EmrStepSensor(EmrBaseSensor):
     def get_emr_response(self):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        logging.info('Poking step {0} on cluster {1}'.format(self.step_id, self.job_flow_id))
+        self.logger.info('Poking step %s on cluster %s', self.step_id, self.job_flow_id)
         return emr.describe_step(ClusterId=self.job_flow_id, StepId=self.step_id)
 
     def state_from_response(self, response):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
index 4a9428b..2e604e9 100644
--- a/airflow/contrib/sensors/ftp_sensor.py
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -11,9 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import ftplib
-import logging
 
 from airflow.contrib.hooks.ftp_hook import FTPHook, FTPSHook
 from airflow.operators.sensors import BaseSensorOperator
@@ -44,7 +42,7 @@ class FTPSensor(BaseSensorOperator):
 
     def poke(self, context):
         with self._create_hook() as hook:
-            logging.info('Poking for %s', self.path)
+            self.logger.info('Poking for %s', self.path)
             try:
                 hook.get_mod_time(self.path)
             except ftplib.error_perm as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index c9d741b..800c1bd 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.operators.sensors import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
@@ -57,7 +54,7 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
         self.delegate_to = delegate_to
 
     def poke(self, context):
-        logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+        self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
         hook = GoogleCloudStorageHook(
             google_cloud_storage_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to)
@@ -119,7 +116,7 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
         self.delegate_to = delegate_to
 
     def poke(self, context):
-        logging.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+        self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
         hook = GoogleCloudStorageHook(
             google_cloud_storage_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py
index 4e9bb9b..11e8b07 100644
--- a/airflow/contrib/sensors/hdfs_sensors.py
+++ b/airflow/contrib/sensors/hdfs_sensors.py
@@ -12,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from airflow.operators.sensors import HdfsSensor
-import logging
 
 
 class HdfsSensorRegex(HdfsSensor):
@@ -29,9 +28,9 @@ class HdfsSensorRegex(HdfsSensor):
         :return: Bool depending on the search criteria
         """
         sb = self.hook(self.hdfs_conn_id).get_conn()
-        logging.getLogger("snakebite").setLevel(logging.WARNING)
-        logging.info(
-            'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals()))
+        self.logger.info(
+            'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals())
+        )
         result = [f for f in sb.ls([self.filepath], include_toplevel=False) if
                   f['file_type'] == 'f' and self.regex.match(f['path'].replace('%s/' % self.filepath, ''))]
         result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
@@ -53,15 +52,14 @@ class HdfsSensorFolder(HdfsSensor):
         :return: Bool depending on the search criteria
         """
         sb = self.hook(self.hdfs_conn_id).get_conn()
-        logging.getLogger("snakebite").setLevel(logging.WARNING)
         result = [f for f in sb.ls([self.filepath], include_toplevel=True)]
         result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
         result = self.filter_for_filesize(result, self.file_size)
         if self.be_empty:
-            logging.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
+            self.logger.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
             return len(result) == 1 and result[0]['path'] == self.filepath
         else:
-            logging.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
+            self.logger.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
             result.pop(0)
             return bool(result) and result[0]['file_type'] == 'f'
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/jira_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py
index 708caad..4cbc676 100644
--- a/airflow/contrib/sensors/jira_sensor.py
+++ b/airflow/contrib/sensors/jira_sensor.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from jira.resources import Resource
 
 from airflow.contrib.operators.jira_operator import JIRAError
@@ -100,8 +97,7 @@ class JiraTicketSensor(JiraSensor):
                                                *args, **kwargs)
 
     def poke(self, context):
-        logging.info('Jira Sensor checking for change in ticket : {0}'
-                     .format(self.ticket_id))
+        self.logger.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
 
         self.jira_operator.method_name = "issue"
         self.jira_operator.jira_method_args = {
@@ -127,20 +123,19 @@ class JiraTicketSensor(JiraSensor):
                             and getattr(field_value, 'name'):
                         result = self.expected_value.lower() == field_value.name.lower()
                     else:
-                        logging.warning("not implemented checker for issue field {0} "
-                                        "which is neither string nor list nor "
-                                        "jira Resource".format(self.field))
+                        self.logger.warning(
+                            "Not implemented checker for issue field %s which "
+                            "is neither string nor list nor Jira Resource",
+                            self.field
+                        )
 
         except JIRAError as jira_error:
-            logging.error("jira error while checking with expected value: {0}"
-                          .format(jira_error))
+            self.logger.error("Jira error while checking with expected value: %s", jira_error)
         except Exception as e:
-            logging.error("error while checking with expected value {0}, error: {1}"
-                          .format(self.expected_value, e))
+            self.logger.error("Error while checking with expected value %s:", self.expected_value)
+            self.logger.exception(e)
         if result is True:
-            logging.info("issue field {0} has expected value {1}, returning success"
-                         .format(self.field, self.expected_value))
+            self.logger.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value)
         else:
-            logging.info("issue field {0} dont have expected value {1} yet."
-                         .format(self.field, self.expected_value))
+            self.logger.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value)
         return result

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
index 4cab407..220d766 100644
--- a/airflow/contrib/sensors/redis_key_sensor.py
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.contrib.hooks.redis_hook import RedisHook
 from airflow.operators.sensors import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
@@ -37,7 +34,6 @@ class RedisKeySensor(BaseSensorOperator):
         :type redis_conn_id: string
         """
         super(RedisKeySensor, self).__init__(*args, **kwargs)
-        self.logger = logging.getLogger(__name__)
         self.redis_conn_id = redis_conn_id
         self.key = key
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
index 3f3d56c..1a54e12 100644
--- a/airflow/contrib/sensors/wasb_sensor.py
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -12,9 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-import logging
-
 from airflow.contrib.hooks.wasb_hook import WasbHook
 from airflow.operators.sensors import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
@@ -23,7 +20,7 @@ from airflow.utils.decorators import apply_defaults
 class WasbBlobSensor(BaseSensorOperator):
     """
     Waits for a blob to arrive on Azure Blob Storage.
-    
+
     :param container_name: Name of the container.
     :type container_name: str
     :param blob_name: Name of the blob.
@@ -50,7 +47,7 @@ class WasbBlobSensor(BaseSensorOperator):
         self.check_options = check_options
 
     def poke(self, context):
-        logging.info(
+        self.logger.info(
             'Poking for blob: {self.blob_name}\n'
             'in wasb://{self.container_name}'.format(**locals())
         )
@@ -62,7 +59,7 @@ class WasbBlobSensor(BaseSensorOperator):
 class WasbPrefixSensor(BaseSensorOperator):
     """
     Waits for blobs matching a prefix to arrive on Azure Blob Storage.
-    
+
     :param container_name: Name of the container.
     :type container_name: str
     :param prefix: Prefix of the blob.
@@ -88,7 +85,7 @@ class WasbPrefixSensor(BaseSensorOperator):
         self.check_options = check_options
 
     def poke(self, context):
-        logging.info(
+        self.logger.info(
             'Poking for prefix: {self.prefix}\n'
             'in wasb://{self.container_name}'.format(**locals())
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/contrib/task_runner/cgroup_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/task_runner/cgroup_task_runner.py b/airflow/contrib/task_runner/cgroup_task_runner.py
index 11c45c1..5d2518d 100644
--- a/airflow/contrib/task_runner/cgroup_task_runner.py
+++ b/airflow/contrib/task_runner/cgroup_task_runner.py
@@ -14,7 +14,6 @@
 
 import datetime
 import getpass
-import subprocess
 import os
 import uuid
 
@@ -73,13 +72,13 @@ class CgroupTaskRunner(BaseTaskRunner):
         for path_element in path_split:
             name_to_node = {x.name: x for x in node.children}
             if path_element not in name_to_node:
-                self.logger.debug("Creating cgroup {} in {}"
-                                  .format(path_element, node.path))
+                self.logger.debug("Creating cgroup %s in %s", path_element, node.path)
                 node = node.create_cgroup(path_element)
             else:
-                self.logger.debug("Not creating cgroup {} in {} "
-                                  "since it already exists"
-                                  .format(path_element, node.path))
+                self.logger.debug(
+                    "Not creating cgroup %s in %s since it already exists",
+                    path_element, node.path
+                )
                 node = name_to_node[path_element]
         return node
 
@@ -95,24 +94,23 @@ class CgroupTaskRunner(BaseTaskRunner):
         for path_element in path_split:
             name_to_node = {x.name: x for x in node.children}
             if path_element not in name_to_node:
-                self.logger.warning("Cgroup does not exist: {}"
-                                    .format(path))
+                self.logger.warning("Cgroup does not exist: %s", path)
                 return
             else:
                 node = name_to_node[path_element]
         # node is now the leaf node
         parent = node.parent
-        self.logger.debug("Deleting cgroup {}/{}".format(parent, node.name))
+        self.logger.debug("Deleting cgroup %s/%s", parent, node.name)
         parent.delete_cgroup(node.name)
 
     def start(self):
         # Use bash if it's already in a cgroup
         cgroups = self._get_cgroup_names()
         if cgroups["cpu"] != "/" or cgroups["memory"] != "/":
-            self.logger.debug("Already running in a cgroup (cpu: {} memory: {} so "
-                              "not creating another one"
-                              .format(cgroups.get("cpu"),
-                                      cgroups.get("memory")))
+            self.logger.debug(
+                "Already running in a cgroup (cpu: %s memory: %s) so not creating another one",
+                cgroups.get("cpu"), cgroups.get("memory")
+            )
             self.process = self.run_command(['bash', '-c'], join_args=True)
             return
 
@@ -135,21 +133,27 @@ class CgroupTaskRunner(BaseTaskRunner):
         mem_cgroup_node = self._create_cgroup(self.mem_cgroup_name)
         self._created_mem_cgroup = True
         if self._mem_mb_limit > 0:
-            self.logger.debug("Setting {} with {} MB of memory"
-                              .format(self.mem_cgroup_name, self._mem_mb_limit))
+            self.logger.debug(
+                "Setting %s with %s MB of memory",
+                self.mem_cgroup_name, self._mem_mb_limit
+            )
             mem_cgroup_node.controller.limit_in_bytes = self._mem_mb_limit * 1024 * 1024
 
         # Create the CPU cgroup
         cpu_cgroup_node = self._create_cgroup(self.cpu_cgroup_name)
         self._created_cpu_cgroup = True
         if self._cpu_shares > 0:
-            self.logger.debug("Setting {} with {} CPU shares"
-                              .format(self.cpu_cgroup_name, self._cpu_shares))
+            self.logger.debug(
+                "Setting %s with %s CPU shares",
+                self.cpu_cgroup_name, self._cpu_shares
+            )
             cpu_cgroup_node.controller.shares = self._cpu_shares
 
         # Start the process w/ cgroups
-        self.logger.debug("Starting task process with cgroups cpu,memory:{}"
-                          .format(cgroup_name))
+        self.logger.debug(
+            "Starting task process with cgroups cpu,memory: %s",
+            cgroup_name
+        )
         self.process = self.run_command(
             ['cgexec', '-g', 'cpu,memory:{}'.format(cgroup_name)]
         )
@@ -165,10 +169,9 @@ class CgroupTaskRunner(BaseTaskRunner):
         # we might want to revisit that approach at some other point.
         if return_code == 137:
             self.logger.warning("Task failed with return code of 137. This may indicate "
-                                "that it was killed due to excessive memory usage. "
-                                "Please consider optimizing your task or using the "
-                                "resources argument to reserve more memory for your "
-                                "task")
+                              "that it was killed due to excessive memory usage. "
+                              "Please consider optimizing your task or using the "
+                              "resources argument to reserve more memory for your task")
         return return_code
 
     def terminate(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index a8cb087..7812f96 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -11,8 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
 import sys
 
 from airflow import configuration
@@ -21,6 +19,7 @@ from airflow.executors.local_executor import LocalExecutor
 from airflow.executors.sequential_executor import SequentialExecutor
 
 from airflow.exceptions import AirflowException
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 DEFAULT_EXECUTOR = None
 
@@ -42,14 +41,15 @@ def GetDefaultExecutor():
 
     DEFAULT_EXECUTOR = _get_executor(executor_name)
 
-    logging.info("Using executor " + executor_name)
+    log = LoggingMixin().logger
+    log.info("Using executor %s", executor_name)
 
     return DEFAULT_EXECUTOR
 
 
 def _get_executor(executor_name):
     """
-    Creates a new instance of the named executor. In case the executor name is not know in airflow, 
+    Creates a new instance of the named executor. In case the executor name is not know in airflow,
     look for it in the plugins
     """
     if executor_name == 'LocalExecutor':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 7a4065e..1197958 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -11,12 +11,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from builtins import range
 
 from airflow import configuration
+from airflow.utils.log.LoggingMixin import LoggingMixin
 from airflow.utils.state import State
-from airflow.utils.logging import LoggingMixin
 
 PARALLELISM = configuration.getint('core', 'PARALLELISM')
 
@@ -47,7 +46,7 @@ class BaseExecutor(LoggingMixin):
     def queue_command(self, task_instance, command, priority=1, queue=None):
         key = task_instance.key
         if key not in self.queued_tasks and key not in self.running:
-            self.logger.info("Adding to queue: {}".format(command))
+            self.logger.info("Adding to queue: %s", command)
             self.queued_tasks[key] = (command, priority, queue, task_instance)
 
     def queue_task_instance(
@@ -100,9 +99,9 @@ class BaseExecutor(LoggingMixin):
         else:
             open_slots = self.parallelism - len(self.running)
 
-        self.logger.debug("{} running task instances".format(len(self.running)))
-        self.logger.debug("{} in queue".format(len(self.queued_tasks)))
-        self.logger.debug("{} open slots".format(open_slots))
+        self.logger.debug("%s running task instances", len(self.running))
+        self.logger.debug("%s in queue", len(self.queued_tasks))
+        self.logger.debug("%s open slots", open_slots)
 
         sorted_queue = sorted(
             [(k, v) for k, v in self.queued_tasks.items()],
@@ -124,11 +123,12 @@ class BaseExecutor(LoggingMixin):
                 self.execute_async(key, command=command, queue=queue)
             else:
                 self.logger.debug(
-                    'Task is already running, not sending to '
-                    'executor: {}'.format(key))
+                    'Task is already running, not sending to executor: %s',
+                    key
+                )
 
         # Calling child class sync method
-        self.logger.debug("Calling the {} sync method".format(self.__class__))
+        self.logger.debug("Calling the %s sync method", self.__class__)
         self.sync()
 
     def change_state(self, key, state):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 17c343b..39c895c 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 
 from builtins import object
-import logging
 import subprocess
 import ssl
 import time
@@ -25,6 +24,7 @@ from celery import states as celery_states
 from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow import configuration
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 PARALLELISM = configuration.get('core', 'PARALLELISM')
 
@@ -53,7 +53,8 @@ class CeleryConfig(object):
     try:
         celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
     except AirflowConfigException as e:
-        logging.warning("Celery Executor will run without SSL")
+        log = LoggingMixin().logger
+        log.warning("Celery Executor will run without SSL")
 
     try:
         if celery_ssl_active:
@@ -75,11 +76,12 @@ app = Celery(
 
 @app.task
 def execute_command(command):
-    logging.info("Executing command in Celery " + command)
+    log = LoggingMixin().logger
+    log.info("Executing command in Celery: %s", command)
     try:
         subprocess.check_call(command, shell=True)
     except subprocess.CalledProcessError as e:
-        logging.error(e)
+        log.error(e)
         raise AirflowException('Celery command failed')
 
 
@@ -92,22 +94,18 @@ class CeleryExecutor(BaseExecutor):
     vast amounts of messages, while providing operations with the tools
     required to maintain such a system.
     """
-
     def start(self):
         self.tasks = {}
         self.last_state = {}
 
     def execute_async(self, key, command, queue=DEFAULT_QUEUE):
-        self.logger.info( "[celery] queuing {key} through celery, "
-                       "queue={queue}".format(**locals()))
+        self.logger.info("[celery] queuing {key} through celery, queue={queue}".format(**locals()))
         self.tasks[key] = execute_command.apply_async(
             args=[command], queue=queue)
         self.last_state[key] = celery_states.PENDING
 
     def sync(self):
-
-        self.logger.debug(
-            "Inquiring about {} celery task(s)".format(len(self.tasks)))
+        self.logger.debug("Inquiring about %s celery task(s)", len(self.tasks))
         for key, async in list(self.tasks.items()):
             try:
                 state = async.state
@@ -125,11 +123,11 @@ class CeleryExecutor(BaseExecutor):
                         del self.tasks[key]
                         del self.last_state[key]
                     else:
-                        self.logger.info("Unexpected state: " + async.state)
+                        self.logger.info("Unexpected state: %s", async.state)
                     self.last_state[key] = async.state
             except Exception as e:
-                logging.error("Error syncing the celery executor, ignoring "
-                              "it:\n{}\n".format(e, traceback.format_exc()))
+                self.logger.error("Error syncing the celery executor, ignoring it:")
+                self.logger.exception(e)
 
     def end(self, synchronous=False):
         if synchronous:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index d65830a..8a56506 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 
 import distributed
-
 import subprocess
 import warnings
 
@@ -41,8 +40,8 @@ class DaskExecutor(BaseExecutor):
     def execute_async(self, key, command, queue=None):
         if queue is not None:
             warnings.warn(
-                'DaskExecutor does not support queues. All tasks will be run '
-                'in the same cluster')
+                'DaskExecutor does not support queues. All tasks will be run in the same cluster'
+            )
 
         def airflow_run():
             return subprocess.check_call(command, shell=True)
@@ -54,12 +53,11 @@ class DaskExecutor(BaseExecutor):
         if future.done():
             key = self.futures[future]
             if future.exception():
+                self.logger.error("Failed to execute task: %s", repr(future.exception()))
                 self.fail(key)
-                self.logger.error("Failed to execute task: {}".format(
-                    repr(future.exception())))
             elif future.cancelled():
-                self.fail(key)
                 self.logger.error("Failed to execute task")
+                self.fail(key)
             else:
                 self.success(key)
             self.futures.pop(future)