You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/17 17:19:48 UTC

[airflow] branch master updated: Add Traceback in LogRecord in ``JSONFormatter`` (#15414)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 99ec208  Add Traceback in LogRecord in ``JSONFormatter`` (#15414)
99ec208 is described below

commit 99ec208024933d790272a09a6f20b241410a7df7
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Apr 17 18:19:23 2021 +0100

    Add Traceback in LogRecord in ``JSONFormatter`` (#15414)
    
    Currently traceback is not included when ``JSONFormatter`` is used.
    (`[logging] json_format = True`) . However, the default Handler
    includes the Stacktrace. To currently include the trace we need to
    add `json_fields = asctime, filename, lineno, levelname, message, exc_text`.
    
    This is a bigger problem when using Elasticsearch Logging with:
    
    ```ini
    [elasticsearch]
    write_stdout = True
    json_format = True
    json_fields = asctime, filename, lineno, levelname, message, exc_text
    
    [logging]
    log_format = [%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s - %(exc_text)s
    ```
    
    Running the following DAG with the above config won't show trace:
    
    ```python
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
    
    with DAG(
        dag_id='example_error',
        schedule_interval=None,
        start_date=days_ago(2),
    ) as dag:
    
        def raise_error(**kwargs):
            raise Exception("I am an exception from task logs")
    
        task_1 = PythonOperator(
            task_id='task_1',
            python_callable=raise_error,
        )
    ```
    
    Before:
    
    ```
    [2021-04-17 00:11:00,152] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: example_python_operator.print_the_context 2021-04-17T00:10:57.110189+00:00 [queued]>
    ...
    ...
    [2021-04-17 00:11:00,298] {taskinstance.py:1482} ERROR - Task failed with exception
    [2021-04-17 00:11:00,300] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=example_python_operator, task_id=print_the_context, execution_date=20210417T001057, start_date=20210417T001100, end_date=20210417T001100
    [2021-04-17 00:11:00,325] {local_task_job.py:146} INFO - Task exited with return code 1
    ```
    
    After:
    
    ```
    [2021-04-17 00:11:00,152] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: example_python_operator.print_the_context 2021-04-17T00:10:57.110189+00:00 [queued]>
    ...
    ...
    [2021-04-17 00:11:00,298] {taskinstance.py:1482} ERROR - Task failed with exception
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
        self._prepare_and_execute_task_with_callbacks(context, task)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
        result = self._execute_task(context, task_copy)
      File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
        result = task_copy.execute(context=context)
      File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 117, in execute
        return_value = self.execute_callable()
      File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 128, in execute_callable
        return self.python_callable(*self.op_args, **self.op_kwargs)
      File "/usr/local/airflow/dags/eg-2.py", line 25, in print_context
        raise Exception("I am an exception from task logs")
    Exception: I am an exception from task logs
    [2021-04-17 00:11:00,300] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=example_python_operator, task_id=print_the_context, execution_date=20210417T001057, start_date=20210417T001100, end_date=20210417T001100
    [2021-04-17 00:11:00,325] {local_task_job.py:146} INFO - Task exited with return code 1
    ```
---
 airflow/utils/log/json_formatter.py    | 11 +++++++++++
 tests/utils/log/test_json_formatter.py | 18 ++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/airflow/utils/log/json_formatter.py b/airflow/utils/log/json_formatter.py
index 73d4619..f271eba 100644
--- a/airflow/utils/log/json_formatter.py
+++ b/airflow/utils/log/json_formatter.py
@@ -43,5 +43,16 @@ class JSONFormatter(logging.Formatter):
     def format(self, record):
         super().format(record)
         record_dict = {label: getattr(record, label, None) for label in self.json_fields}
+        if "message" in self.json_fields:
+            msg = record_dict["message"]
+            if record.exc_text:
+                if msg[-1:] != "\n":
+                    msg = msg + "\n"
+                msg = msg + record.exc_text
+            if record.stack_info:
+                if msg[-1:] != "\n":
+                    msg = msg + "\n"
+                msg = msg + self.formatStack(record.stack_info)
+            record_dict["message"] = msg
         merged_record = merge_dicts(record_dict, self.extras)
         return json.dumps(merged_record)
diff --git a/tests/utils/log/test_json_formatter.py b/tests/utils/log/test_json_formatter.py
index b25d11b..511e8e0 100644
--- a/tests/utils/log/test_json_formatter.py
+++ b/tests/utils/log/test_json_formatter.py
@@ -20,6 +20,7 @@
 Module for all tests airflow.utils.log.json_formatter.JSONFormatter
 """
 import json
+import sys
 import unittest
 from logging import makeLogRecord
 
@@ -63,3 +64,20 @@ class TestJSONFormatter(unittest.TestCase):
         json_fmt = JSONFormatter(json_fields=["label"], extras={'pod_extra': 'useful_message'})
         # compare as a dicts to not fail on sorting errors
         assert json.loads(json_fmt.format(log_record)) == {"label": "value", "pod_extra": "useful_message"}
+
+    def test_format_with_exception(self):
+        """
+        Test exception is included in the message when using JSONFormatter
+        """
+        try:
+            raise RuntimeError("message")
+        except RuntimeError:
+            exc_info = sys.exc_info()
+
+        log_record = makeLogRecord({"exc_info": exc_info, "message": "Some msg"})
+        json_fmt = JSONFormatter(json_fields=["message"])
+
+        log_fmt = json.loads(json_fmt.format(log_record))
+        assert "message" in log_fmt
+        assert "Traceback (most recent call last)" in log_fmt["message"]
+        assert 'raise RuntimeError("message")' in log_fmt["message"]