You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2022/09/07 10:07:08 UTC
[airflow] branch main updated: Add option of sending DAG parser logs to stdout. (#25754)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dc94afd7f8 Add option of sending DAG parser logs to stdout. (#25754)
dc94afd7f8 is described below
commit dc94afd7f816f248b21bb49cfc38088692c595d0
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Wed Sep 7 05:06:58 2022 -0500
Add option of sending DAG parser logs to stdout. (#25754)
* Add option of sending DAG parser logs to stdout.
* Make long config line into two lines.
* Reverse order of tests.
* Update tests with new changes.
---
airflow/config_templates/airflow_local_settings.py | 15 +++++-
airflow/config_templates/config.yml | 15 ++++++
airflow/config_templates/default_airflow.cfg | 6 +++
airflow/dag_processing/processor.py | 55 ++++++++++++----------
tests/dag_processing/test_processor.py | 48 ++++++++++++++++++-
5 files changed, 113 insertions(+), 26 deletions(-)
diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index 16f225d38b..9e145a0b8b 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -37,6 +37,7 @@ LOG_LEVEL: str = conf.get_mandatory_value('logging', 'LOGGING_LEVEL').upper()
FAB_LOG_LEVEL: str = conf.get_mandatory_value('logging', 'FAB_LOGGING_LEVEL').upper()
LOG_FORMAT: str = conf.get_mandatory_value('logging', 'LOG_FORMAT')
+DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value('logging', 'DAG_PROCESSOR_LOG_FORMAT')
LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
'logging', 'LOG_FORMATTER_CLASS', fallback='airflow.utils.log.timezone_aware.TimezoneAware'
@@ -48,6 +49,8 @@ COLORED_LOG: bool = conf.getboolean('logging', 'COLORED_CONSOLE_LOG')
COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value('logging', 'COLORED_FORMATTER_CLASS')
+DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value('logging', 'DAG_PROCESSOR_LOG_TARGET')
+
BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')
@@ -75,6 +78,10 @@ DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
'class': COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
},
+ 'source_processor': {
+ 'format': DAG_PROCESSOR_LOG_FORMAT,
+ 'class': LOG_FORMATTER_CLASS,
+ },
},
'filters': {
'mask_secrets': {
@@ -101,10 +108,16 @@ DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
'filters': ['mask_secrets'],
},
+ 'processor_to_stdout': {
+ 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
+ 'formatter': 'source_processor',
+ 'stream': 'sys.stdout',
+ 'filters': ['mask_secrets'],
+ },
},
'loggers': {
'airflow.processor': {
- 'handlers': ['processor'],
+ 'handlers': ['processor_to_stdout' if DAG_PROCESSOR_LOG_TARGET == "stdout" else 'processor'],
'level': LOG_LEVEL,
'propagate': False,
},
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 4537b62d6b..a6d19e7e2c 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -659,6 +659,21 @@
type: string
example: ~
default: "%%(asctime)s %%(levelname)s - %%(message)s"
+ - name: dag_processor_log_target
+ description: Where to send dag parser logs. If "file",
+ logs are sent to log files defined by child_process_log_directory.
+ version_added: 2.4.0
+ type: string
+ example: ~
+ default: "file"
+ - name: dag_processor_log_format
+ description: |
+ Format of Dag Processor Log line
+ version_added: 2.3.4
+ type: string
+ example: ~
+ default: "[%%(asctime)s] [SOURCE:DAG_PROCESSOR]
+ {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s"
- name: log_formatter_class
description: ~
version_added: 2.3.4
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 7cd116369e..2c88a759c3 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -366,6 +366,12 @@ colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatte
# Format of Log line
log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
+
+# Where to send dag parser logs. If "file", logs are sent to log files defined by child_process_log_directory.
+dag_processor_log_target = file
+
+# Format of Dag Processor Log line
+dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
# Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter
diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index fa1eb46c29..dc6da4b052 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -143,31 +143,38 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
set_context(log, file_path)
setproctitle(f"airflow scheduler - DagFileProcessor {file_path}")
+
+ def _handle_dag_file_processing():
+ # Re-configure the ORM engine as there are issues with multiple processes
+ # settings.configure_orm()
+
+ # Change the thread name to differentiate log lines. This is
+ # really a separate process, but changing the name of the
+ # process doesn't work, so changing the thread name instead.
+ threading.current_thread().name = thread_name
+
+ log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
+ dag_file_processor = DagFileProcessor(dag_ids=dag_ids, dag_directory=dag_directory, log=log)
+ result: Tuple[int, int] = dag_file_processor.process_file(
+ file_path=file_path,
+ pickle_dags=pickle_dags,
+ callback_requests=callback_requests,
+ )
+ result_channel.send(result)
+
try:
- # redirect stdout/stderr to log
- with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
- StreamLogWriter(log, logging.WARN)
- ), Stats.timer() as timer:
- # Re-configure the ORM engine as there are issues with multiple processes
- settings.configure_orm()
-
- # Change the thread name to differentiate log lines. This is
- # really a separate process, but changing the name of the
- # process doesn't work, so changing the thread name instead.
- threading.current_thread().name = thread_name
-
- log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
- dag_file_processor = DagFileProcessor(
- dag_ids=dag_ids,
- dag_directory=dag_directory,
- log=log,
- )
- result: Tuple[int, int] = dag_file_processor.process_file(
- file_path=file_path,
- pickle_dags=pickle_dags,
- callback_requests=callback_requests,
- )
- result_channel.send(result)
+ DAG_PROCESSOR_LOG_TARGET = conf.get_mandatory_value('logging', 'DAG_PROCESSOR_LOG_TARGET')
+ if DAG_PROCESSOR_LOG_TARGET == "stdout":
+ with Stats.timer() as timer:
+ _handle_dag_file_processing()
+ else:
+ # The following line ensures that stdout goes to the same destination as the logs. If stdout
+ # gets sent to logs and logs are sent to stdout, this leads to an infinite loop. This
+ # necessitates this conditional based on the value of DAG_PROCESSOR_LOG_TARGET.
+ with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
+ StreamLogWriter(log, logging.WARN)
+ ), Stats.timer() as timer:
+ _handle_dag_file_processing()
log.info("Processing %s took %.3f seconds", file_path, timer.duration)
except Exception:
# Log exceptions through the logging framework.
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index b0e09d0417..a41a97bd5a 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -28,7 +28,7 @@ from airflow import settings
from airflow.callbacks.callback_requests import TaskCallbackRequest
from airflow.configuration import TEST_DAGS_FOLDER, conf
from airflow.dag_processing.manager import DagFileProcessorAgent
-from airflow.dag_processing.processor import DagFileProcessor
+from airflow.dag_processing.processor import DagFileProcessor, DagFileProcessorProcess
from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance, errors
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance
@@ -788,6 +788,52 @@ class TestDagFileProcessor:
assert import_error.stacktrace == expected_stacktrace.format(invalid_dag_filename)
session.rollback()
+ @conf_vars({("logging", "dag_processor_log_target"): "stdout"})
+ @mock.patch('airflow.dag_processing.processor.settings.dispose_orm', MagicMock)
+ @mock.patch('airflow.dag_processing.processor.redirect_stdout')
+ def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for_file):
+ processor = DagFileProcessorProcess(
+ file_path='abc.txt',
+ pickle_dags=False,
+ dag_ids=[],
+ dag_directory=[],
+ callback_requests=[],
+ )
+ processor._run_file_processor(
+ result_channel=MagicMock(),
+ parent_channel=MagicMock(),
+ file_path="fake_file_path",
+ pickle_dags=False,
+ dag_ids=[],
+ thread_name="fake_thread_name",
+ callback_requests=[],
+ dag_directory=[],
+ )
+ mock_redirect_stdout_for_file.assert_not_called()
+
+ @conf_vars({("logging", "dag_processor_log_target"): "file"})
+ @mock.patch('airflow.dag_processing.processor.settings.dispose_orm', MagicMock)
+ @mock.patch('airflow.dag_processing.processor.redirect_stdout')
+ def test_dag_parser_output_when_logging_to_file(self, mock_redirect_stdout_for_file):
+ processor = DagFileProcessorProcess(
+ file_path='abc.txt',
+ pickle_dags=False,
+ dag_ids=[],
+ dag_directory=[],
+ callback_requests=[],
+ )
+ processor._run_file_processor(
+ result_channel=MagicMock(),
+ parent_channel=MagicMock(),
+ file_path="fake_file_path",
+ pickle_dags=False,
+ dag_ids=[],
+ thread_name="fake_thread_name",
+ callback_requests=[],
+ dag_directory=[],
+ )
+ mock_redirect_stdout_for_file.assert_called_once()
+
class TestProcessorAgent:
@pytest.fixture(autouse=True)