You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/12/02 08:56:22 UTC

incubator-airflow git commit: [AIRFLOW-1872] Set context for all handlers including parents

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3e321790d -> 406d738b1


[AIRFLOW-1872] Set context for all handlers including parents

Previously setting the context was not propagated
to the parent
loggers. Unfortnately, in case of a non explicitly
defined logger
the returned logger is shallow, ie. it does not
have handlers
defined. So to set the context it is required to
walk the tree.

Closes #2831 from bolkedebruin/fix_logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/406d738b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/406d738b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/406d738b

Branch: refs/heads/master
Commit: 406d738b1cf657b5ee6163bc26ab6fdea891576d
Parents: 3e32179
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Dec 2 09:56:13 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Dec 2 09:56:13 2017 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py                      | 11 +++-------
 airflow/jobs.py                         | 10 ++-------
 airflow/task_runner/base_task_runner.py |  2 +-
 airflow/utils/log/logging_mixin.py      | 32 +++++++++++++++++++---------
 tests/utils/test_logging_mixin.py       | 19 ++++++++++++++++-
 5 files changed, 46 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/406d738b/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 6d01293..1001e05 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -54,7 +54,8 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
 
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
-from airflow.utils.log.logging_mixin import LoggingMixin, redirect_stderr, redirect_stdout
+from airflow.utils.log.logging_mixin import (LoggingMixin, redirect_stderr,
+                                             redirect_stdout, set_context)
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -367,13 +368,7 @@ def run(args, dag=None):
     if args.raw:
         log = logging.getLogger('airflow.task.raw')
 
-    for handler in log.handlers:
-        try:
-            handler.set_context(ti)
-        except AttributeError:
-            # Not all handlers need to have context passed in so we ignore
-            # the error when handlers do not have set_context defined.
-            pass
+    set_context(log, ti)
 
     hostname = socket.getfqdn()
     log.info("Running on host %s", hostname)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/406d738b/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 868e785..0ff7819 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -57,7 +57,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
 from airflow.utils.db import (
     create_session, provide_session, pessimistic_connection_handling)
 from airflow.utils.email import send_email
-from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter
+from airflow.utils.log.logging_mixin import LoggingMixin, set_context, StreamLogWriter
 from airflow.utils.state import State
 
 Base = models.Base
@@ -345,13 +345,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
             stdout = StreamLogWriter(log, logging.INFO)
             stderr = StreamLogWriter(log, logging.WARN)
 
-            for handler in log.handlers:
-                try:
-                    handler.set_context(file_path)
-                except AttributeError:
-                    # Not all handlers need to have context passed in so we ignore
-                    # the error when handlers do not have set_context defined.
-                    pass
+            set_context(log, file_path)
 
             try:
                 # redirect stdout/stderr to log

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/406d738b/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py
index f4b4f2d..ff23a50 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -38,8 +38,8 @@ class BaseTaskRunner(LoggingMixin):
         :type local_task_job: airflow.jobs.LocalTaskJob
         """
         # Pass task instance context into log handlers to setup the logger.
+        super(BaseTaskRunner, self).__init__(local_task_job.task_instance)
         self._task_instance = local_task_job.task_instance
-        self.set_log_contexts(self._task_instance)
 
         popen_prepend = []
         cfg_path = None

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/406d738b/airflow/utils/log/logging_mixin.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
index c12bb8b..892fae5 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -29,6 +29,9 @@ class LoggingMixin(object):
     """
     Convenience super-class to have a logger configured with the class name
     """
+    def __init__(self, context=None):
+        if context is not None:
+            set_context(self.log, context)
 
     # We want to deprecate the logger property in Airflow 2.0
     # The log property is the de facto standard in most programming languages
@@ -53,16 +56,6 @@ class LoggingMixin(object):
             )
             return self._log
 
-    def set_log_contexts(self, task_instance):
-        """
-        Set the context for all handlers of current logger.
-        """
-        for handler in self.log.handlers:
-            try:
-                handler.set_context(task_instance)
-            except AttributeError:
-                pass
-
 
 class StreamLogWriter(object):
     encoding = False
@@ -127,3 +120,22 @@ def redirect_stderr(logger, level):
         sys.stderr = sys.__stderr__
 
 
+def set_context(logger, value):
+    """
+    Walks the tree of loggers and tries to set the context for each handler
+    :param logger: logger
+    :param value: value to set
+    """
+    _logger = logger
+    while _logger:
+        for handler in _logger.handlers:
+            try:
+                handler.set_context(value)
+            except AttributeError:
+                # Not all handlers need to have context passed in so we ignore
+                # the error when handlers do not have set_context defined.
+                pass
+        if _logger.propagate is True:
+            _logger = _logger.parent
+        else:
+            _logger = None

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/406d738b/tests/utils/test_logging_mixin.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_logging_mixin.py b/tests/utils/test_logging_mixin.py
index 52d8b45..a7c260d 100644
--- a/tests/utils/test_logging_mixin.py
+++ b/tests/utils/test_logging_mixin.py
@@ -17,7 +17,7 @@ import unittest
 import warnings
 
 from airflow.operators.bash_operator import BashOperator
-from airflow.utils.log.logging_mixin import StreamLogWriter
+from airflow.utils.log.logging_mixin import set_context, StreamLogWriter
 from tests.test_utils.reset_warning_registry import reset_warning_registry
 
 
@@ -48,6 +48,23 @@ class TestLoggingMixin(unittest.TestCase):
                     str(warning.message)
                 )
 
+    def test_set_context(self):
+        handler1 = mock.MagicMock()
+        handler2 = mock.MagicMock()
+        parent = mock.MagicMock()
+        parent.propagate = False
+        parent.handlers = [handler1, ]
+        log = mock.MagicMock()
+        log.handlers = [handler2, ]
+        log.parent = parent
+        log.propagate = True
+
+        value = "test"
+        set_context(log, value)
+
+        handler1.set_context.assert_called_with(value)
+        handler2.set_context.assert_called_with(value)
+
     def tearDown(self):
         warnings.resetwarnings()