You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/17 17:19:48 UTC
[airflow] branch master updated: Add Traceback in LogRecord in
``JSONFormatter`` (#15414)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 99ec208 Add Traceback in LogRecord in ``JSONFormatter`` (#15414)
99ec208 is described below
commit 99ec208024933d790272a09a6f20b241410a7df7
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Apr 17 18:19:23 2021 +0100
Add Traceback in LogRecord in ``JSONFormatter`` (#15414)
Currently traceback is not included when ``JSONFormatter`` is used.
(`[logging] json_format = True`) . However, the default Handler
includes the Stacktrace. To currently include the trace we need to
add `json_fields = asctime, filename, lineno, levelname, message, exc_text`.
This is a bigger problem when using Elasticsearch Logging with:
```ini
[elasticsearch]
write_stdout = True
json_format = True
json_fields = asctime, filename, lineno, levelname, message, exc_text
[logging]
log_format = [%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s - %(exc_text)s
```
Running the following DAG with the above config won't show trace:
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
with DAG(
dag_id='example_error',
schedule_interval=None,
start_date=days_ago(2),
) as dag:
def raise_error(**kwargs):
raise Exception("I am an exception from task logs")
task_1 = PythonOperator(
task_id='task_1',
python_callable=raise_error,
)
```
Before:
```
[2021-04-17 00:11:00,152] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: example_python_operator.print_the_context 2021-04-17T00:10:57.110189+00:00 [queued]>
...
...
[2021-04-17 00:11:00,298] {taskinstance.py:1482} ERROR - Task failed with exception
[2021-04-17 00:11:00,300] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=example_python_operator, task_id=print_the_context, execution_date=20210417T001057, start_date=20210417T001100, end_date=20210417T001100
[2021-04-17 00:11:00,325] {local_task_job.py:146} INFO - Task exited with return code 1
```
After:
```
[2021-04-17 00:11:00,152] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: example_python_operator.print_the_context 2021-04-17T00:10:57.110189+00:00 [queued]>
...
...
[2021-04-17 00:11:00,298] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/eg-2.py", line 25, in print_context
raise Exception("I am an exception from task logs")
Exception: I am an exception from task logs
[2021-04-17 00:11:00,300] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=example_python_operator, task_id=print_the_context, execution_date=20210417T001057, start_date=20210417T001100, end_date=20210417T001100
[2021-04-17 00:11:00,325] {local_task_job.py:146} INFO - Task exited with return code 1
```
---
airflow/utils/log/json_formatter.py | 11 +++++++++++
tests/utils/log/test_json_formatter.py | 18 ++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git a/airflow/utils/log/json_formatter.py b/airflow/utils/log/json_formatter.py
index 73d4619..f271eba 100644
--- a/airflow/utils/log/json_formatter.py
+++ b/airflow/utils/log/json_formatter.py
@@ -43,5 +43,16 @@ class JSONFormatter(logging.Formatter):
def format(self, record):
super().format(record)
record_dict = {label: getattr(record, label, None) for label in self.json_fields}
+ if "message" in self.json_fields:
+ msg = record_dict["message"]
+ if record.exc_text:
+ if msg[-1:] != "\n":
+ msg = msg + "\n"
+ msg = msg + record.exc_text
+ if record.stack_info:
+ if msg[-1:] != "\n":
+ msg = msg + "\n"
+ msg = msg + self.formatStack(record.stack_info)
+ record_dict["message"] = msg
merged_record = merge_dicts(record_dict, self.extras)
return json.dumps(merged_record)
diff --git a/tests/utils/log/test_json_formatter.py b/tests/utils/log/test_json_formatter.py
index b25d11b..511e8e0 100644
--- a/tests/utils/log/test_json_formatter.py
+++ b/tests/utils/log/test_json_formatter.py
@@ -20,6 +20,7 @@
Module for all tests airflow.utils.log.json_formatter.JSONFormatter
"""
import json
+import sys
import unittest
from logging import makeLogRecord
@@ -63,3 +64,20 @@ class TestJSONFormatter(unittest.TestCase):
json_fmt = JSONFormatter(json_fields=["label"], extras={'pod_extra': 'useful_message'})
# compare as a dicts to not fail on sorting errors
assert json.loads(json_fmt.format(log_record)) == {"label": "value", "pod_extra": "useful_message"}
+
+ def test_format_with_exception(self):
+ """
+ Test exception is included in the message when using JSONFormatter
+ """
+ try:
+ raise RuntimeError("message")
+ except RuntimeError:
+ exc_info = sys.exc_info()
+
+ log_record = makeLogRecord({"exc_info": exc_info, "message": "Some msg"})
+ json_fmt = JSONFormatter(json_fields=["message"])
+
+ log_fmt = json.loads(json_fmt.format(log_record))
+ assert "message" in log_fmt
+ assert "Traceback (most recent call last)" in log_fmt["message"]
+ assert 'raise RuntimeError("message")' in log_fmt["message"]