You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/10/18 13:10:34 UTC

[airflow] 18/41: Ensure the log messages from operators during parsing go somewhere (#26779)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 88bed082e67c04b64bc024a21fddcee9f372f711
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Fri Sep 30 14:08:41 2022 +0100

    Ensure the log messages from operators during parsing go somewhere (#26779)
    
    * Ensure the log messages from operators during parsing go somewhere
    
    While investigating #26599 and the change from AIP-45, I noticed that
    these warning messages weren't new! The only thing that was new was that
    we started seeing them.
    
    This is because the logger for BaseOperator and all subclasses is
    `airflow.task.operators`, and the `airflow.task` logger is not
    configured (with `set_context()`) until we have a TaskInstance, so it
    just dropped all messages on the floor!
    
    This changes it so that log messages are propagated to parent loggers by
    default, but when we configure a context (and thus have a file to write
    to) we stop that. A similar change was made for the `airflow.processor`
    (but that is unlikely to suffer the same fate)
    
    * Give a real row count value so logs don't fail
    
    The ArangoDB sensor test was logging a mock object, which previously was
    getting dropped before emitting, but with this change now fails with
    "Mock is not an integer" when attempting the  `%d` interpolation.
    
    To avoid making the mock overly specific (`arangodb_client_for_test.db.`
    `return_value.aql.execute.return_value.count.return_value`!) I have
    changed the test to mock the hook entirely (which is already tested)
    
    (cherry picked from commit 7363e35c9dbb9860eabf2444307f4d6f8140ab70)
---
 airflow/config_templates/airflow_local_settings.py |  6 ++++--
 airflow/utils/log/file_processor_handler.py        |  3 +++
 airflow/utils/log/file_task_handler.py             |  3 +++
 airflow/utils/log/logging_mixin.py                 | 18 ++++++++++--------
 tests/conftest.py                                  | 11 +++++++++++
 tests/models/test_baseoperator.py                  | 10 ++++++++++
 tests/providers/arangodb/sensors/test_arangodb.py  |  8 ++++----
 7 files changed, 45 insertions(+), 14 deletions(-)

diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index e08274de31..317544ca7e 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -120,12 +120,14 @@ DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
         'airflow.processor': {
             'handlers': ['processor_to_stdout' if DAG_PROCESSOR_LOG_TARGET == "stdout" else 'processor'],
             'level': LOG_LEVEL,
-            'propagate': False,
+            # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
+            'propagate': True,
         },
         'airflow.task': {
             'handlers': ['task'],
             'level': LOG_LEVEL,
-            'propagate': False,
+            # Set to true here (and reset via set_context) so that if no file is configured we still get logs!
+            'propagate': True,
             'filters': ['mask_secrets'],
         },
         'flask_appbuilder': {
diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py
index e9d2923978..11e473ecdf 100644
--- a/airflow/utils/log/file_processor_handler.py
+++ b/airflow/utils/log/file_processor_handler.py
@@ -24,6 +24,7 @@ from pathlib import Path
 
 from airflow import settings
 from airflow.utils.helpers import parse_template_string
+from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 
 
@@ -64,6 +65,8 @@ class FileProcessorHandler(logging.Handler):
             self._symlink_latest_log_directory()
             self._cur_date = datetime.today()
 
+        return DISABLE_PROPOGATE
+
     def emit(self, record):
         if self.handler is not None:
             self.handler.emit(record)
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 80addd3ded..a84dabf7d1 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -29,6 +29,7 @@ from airflow.configuration import AirflowConfigException, conf
 from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.utils.context import Context
 from airflow.utils.helpers import parse_template_string, render_template_to_string
+from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 from airflow.utils.session import create_session
 
@@ -72,6 +73,8 @@ class FileTaskHandler(logging.Handler):
             self.handler.setFormatter(self.formatter)
         self.handler.setLevel(self.level)
 
+        return DISABLE_PROPOGATE
+
     def emit(self, record):
         if self.handler:
             self.handler.emit(record)
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
index 89bddb3558..b54a6f3baa 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -28,6 +28,9 @@ from typing import IO
 # 7-bit C1 ANSI escape sequences
 ANSI_ESCAPE = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')
 
+# Private: A sentinel object
+DISABLE_PROPOGATE = object()
+
 
 def remove_escape_codes(text: str) -> str:
     """
@@ -179,15 +182,14 @@ def set_context(logger, value):
     :param logger: logger
     :param value: value to set
     """
-    _logger = logger
-    while _logger:
-        for handler in _logger.handlers:
+    while logger:
+        for handler in logger.handlers:
             # Not all handlers need to have context passed in so we ignore
             # the error when handlers do not have set_context defined.
             set_context = getattr(handler, 'set_context', None)
-            if set_context:
-                set_context(value)
-        if _logger.propagate is True:
-            _logger = _logger.parent
+            if set_context and set_context(value) is DISABLE_PROPOGATE:
+                logger.propagate = False
+        if logger.propagate is True:
+            logger = logger.parent
         else:
-            _logger = None
+            break
diff --git a/tests/conftest.py b/tests/conftest.py
index cb5f449eb1..1026dc3c01 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -840,3 +840,14 @@ def create_log_template(request):
         request.addfinalizer(_delete_log_template)
 
     return _create_log_template
+
+
+@pytest.fixture()
+def reset_logging_config():
+    import logging.config
+
+    from airflow import settings
+    from airflow.utils.module_loading import import_string
+
+    logging_config = import_string(settings.LOGGING_CLASS_PATH)
+    logging.config.dictConfig(logging_config)
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index 6dd0e22396..f1728db9c9 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -685,6 +685,16 @@ class TestBaseOperator:
         op = BaseOperator(task_id="test_task", weight_rule="upstream")
         assert WeightRule.UPSTREAM == op.weight_rule
 
+    # ensure the default logging config is used for this test, no matter what ran before
+    @pytest.mark.usefixtures('reset_logging_config')
+    def test_logging_propogated_by_default(self, caplog):
+        """Test that when set_context hasn't been called that log records are emitted"""
+        BaseOperator(task_id="test").log.warning("test")
+        # This looks like "how could it fail" but this actually checks that the handler called `emit`. Testing
+        # the other case (that when we have set_context it goes to the file is harder to achieve without
+        # leaking a lot of state)
+        assert caplog.messages == ["test"]
+
 
 def test_init_subclass_args():
     class InitSubclassOp(BaseOperator):
diff --git a/tests/providers/arangodb/sensors/test_arangodb.py b/tests/providers/arangodb/sensors/test_arangodb.py
index 3d03724634..be75e9845f 100644
--- a/tests/providers/arangodb/sensors/test_arangodb.py
+++ b/tests/providers/arangodb/sensors/test_arangodb.py
@@ -26,7 +26,7 @@ from airflow.providers.arangodb.sensors.arangodb import AQLSensor
 from airflow.utils import db, timezone
 
 DEFAULT_DATE = timezone.datetime(2017, 1, 1)
-arangodb_client_mock = Mock(name="arangodb_client_for_test")
+arangodb_hook_mock = Mock(name="arangodb_hook_for_test", **{'query.return_value.count.return_value': 1})
 
 
 class TestAQLSensor(unittest.TestCase):
@@ -46,9 +46,9 @@ class TestAQLSensor(unittest.TestCase):
         )
 
     @patch(
-        "airflow.providers.arangodb.hooks.arangodb.ArangoDBClient",
+        "airflow.providers.arangodb.sensors.arangodb.ArangoDBHook",
         autospec=True,
-        return_value=arangodb_client_mock,
+        return_value=arangodb_hook_mock,
     )
     def test_arangodb_document_created(self, arangodb_mock):
         query = "FOR doc IN students FILTER doc.name == 'judy' RETURN doc"
@@ -62,4 +62,4 @@ class TestAQLSensor(unittest.TestCase):
         )
 
         arangodb_tag_sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-        assert arangodb_mock.return_value.db.called
+        assert arangodb_hook_mock.query.return_value.count.called