You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2022/09/07 10:07:08 UTC

[airflow] branch main updated: Add option of sending DAG parser logs to stdout. (#25754)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dc94afd7f8 Add option of sending DAG parser logs to stdout. (#25754)
dc94afd7f8 is described below

commit dc94afd7f816f248b21bb49cfc38088692c595d0
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Wed Sep 7 05:06:58 2022 -0500

    Add option of sending DAG parser logs to stdout. (#25754)
    
    * Add option of sending DAG parser logs to stdout.
    
    * Make long config line into two lines.
    
    * Reverse order of tests.
    
    * Update tests with new changes.
---
 airflow/config_templates/airflow_local_settings.py | 15 +++++-
 airflow/config_templates/config.yml                | 15 ++++++
 airflow/config_templates/default_airflow.cfg       |  6 +++
 airflow/dag_processing/processor.py                | 55 ++++++++++++----------
 tests/dag_processing/test_processor.py             | 48 ++++++++++++++++++-
 5 files changed, 113 insertions(+), 26 deletions(-)

diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index 16f225d38b..9e145a0b8b 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -37,6 +37,7 @@ LOG_LEVEL: str = conf.get_mandatory_value('logging', 'LOGGING_LEVEL').upper()
 FAB_LOG_LEVEL: str = conf.get_mandatory_value('logging', 'FAB_LOGGING_LEVEL').upper()
 
 LOG_FORMAT: str = conf.get_mandatory_value('logging', 'LOG_FORMAT')
+DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value('logging', 'DAG_PROCESSOR_LOG_FORMAT')
 
 LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
     'logging', 'LOG_FORMATTER_CLASS', fallback='airflow.utils.log.timezone_aware.TimezoneAware'
@@ -48,6 +49,8 @@ COLORED_LOG: bool = conf.getboolean('logging', 'COLORED_CONSOLE_LOG')
 
 COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value('logging', 'COLORED_FORMATTER_CLASS')
 
+DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value('logging', 'DAG_PROCESSOR_LOG_TARGET')
+
 BASE_LOG_FOLDER: str = conf.get_mandatory_value('logging', 'BASE_LOG_FOLDER')
 
 PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')
@@ -75,6 +78,10 @@ DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
             'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
             'class': COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
         },
+        'source_processor': {
+            'format': DAG_PROCESSOR_LOG_FORMAT,
+            'class': LOG_FORMATTER_CLASS,
+        },
     },
     'filters': {
         'mask_secrets': {
@@ -101,10 +108,16 @@ DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
             'filename_template': PROCESSOR_FILENAME_TEMPLATE,
             'filters': ['mask_secrets'],
         },
+        'processor_to_stdout': {
+            'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
+            'formatter': 'source_processor',
+            'stream': 'sys.stdout',
+            'filters': ['mask_secrets'],
+        },
     },
     'loggers': {
         'airflow.processor': {
-            'handlers': ['processor'],
+            'handlers': ['processor_to_stdout' if DAG_PROCESSOR_LOG_TARGET == "stdout" else 'processor'],
             'level': LOG_LEVEL,
             'propagate': False,
         },
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 4537b62d6b..a6d19e7e2c 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -659,6 +659,21 @@
       type: string
       example: ~
       default: "%%(asctime)s %%(levelname)s - %%(message)s"
+    - name: dag_processor_log_target
+      description: Where to send dag parser logs. If "file",
+        logs are sent to log files defined by child_process_log_directory.
+      version_added: 2.4.0
+      type: string
+      example: ~
+      default: "file"
+    - name: dag_processor_log_format
+      description: |
+        Format of Dag Processor Log line
+      version_added: 2.3.4
+      type: string
+      example: ~
+      default: "[%%(asctime)s] [SOURCE:DAG_PROCESSOR]
+        {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s"
     - name: log_formatter_class
       description: ~
       version_added: 2.3.4
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 7cd116369e..2c88a759c3 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -366,6 +366,12 @@ colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatte
 # Format of Log line
 log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
 simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
+
+# Where to send dag parser logs. If "file", logs are sent to log files defined by child_process_log_directory.
+dag_processor_log_target = file
+
+# Format of Dag Processor Log line
+dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
 log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
 
 # Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter
diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index fa1eb46c29..dc6da4b052 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -143,31 +143,38 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
 
         set_context(log, file_path)
         setproctitle(f"airflow scheduler - DagFileProcessor {file_path}")
+
+        def _handle_dag_file_processing():
+            # Re-configure the ORM engine as there are issues with multiple processes
+            # settings.configure_orm()
+
+            # Change the thread name to differentiate log lines. This is
+            # really a separate process, but changing the name of the
+            # process doesn't work, so changing the thread name instead.
+            threading.current_thread().name = thread_name
+
+            log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
+            dag_file_processor = DagFileProcessor(dag_ids=dag_ids, dag_directory=dag_directory, log=log)
+            result: Tuple[int, int] = dag_file_processor.process_file(
+                file_path=file_path,
+                pickle_dags=pickle_dags,
+                callback_requests=callback_requests,
+            )
+            result_channel.send(result)
+
         try:
-            # redirect stdout/stderr to log
-            with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
-                StreamLogWriter(log, logging.WARN)
-            ), Stats.timer() as timer:
-                # Re-configure the ORM engine as there are issues with multiple processes
-                settings.configure_orm()
-
-                # Change the thread name to differentiate log lines. This is
-                # really a separate process, but changing the name of the
-                # process doesn't work, so changing the thread name instead.
-                threading.current_thread().name = thread_name
-
-                log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
-                dag_file_processor = DagFileProcessor(
-                    dag_ids=dag_ids,
-                    dag_directory=dag_directory,
-                    log=log,
-                )
-                result: Tuple[int, int] = dag_file_processor.process_file(
-                    file_path=file_path,
-                    pickle_dags=pickle_dags,
-                    callback_requests=callback_requests,
-                )
-                result_channel.send(result)
+            DAG_PROCESSOR_LOG_TARGET = conf.get_mandatory_value('logging', 'DAG_PROCESSOR_LOG_TARGET')
+            if DAG_PROCESSOR_LOG_TARGET == "stdout":
+                with Stats.timer() as timer:
+                    _handle_dag_file_processing()
+            else:
+                # The following line ensures that stdout goes to the same destination as the logs. If stdout
+                # gets sent to logs and logs are sent to stdout, this leads to an infinite loop. This
+                # necessitates this conditional based on the value of DAG_PROCESSOR_LOG_TARGET.
+                with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
+                    StreamLogWriter(log, logging.WARN)
+                ), Stats.timer() as timer:
+                    _handle_dag_file_processing()
             log.info("Processing %s took %.3f seconds", file_path, timer.duration)
         except Exception:
             # Log exceptions through the logging framework.
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index b0e09d0417..a41a97bd5a 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -28,7 +28,7 @@ from airflow import settings
 from airflow.callbacks.callback_requests import TaskCallbackRequest
 from airflow.configuration import TEST_DAGS_FOLDER, conf
 from airflow.dag_processing.manager import DagFileProcessorAgent
-from airflow.dag_processing.processor import DagFileProcessor
+from airflow.dag_processing.processor import DagFileProcessor, DagFileProcessorProcess
 from airflow.models import DagBag, DagModel, SlaMiss, TaskInstance, errors
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance
@@ -788,6 +788,52 @@ class TestDagFileProcessor:
             assert import_error.stacktrace == expected_stacktrace.format(invalid_dag_filename)
             session.rollback()
 
+    @conf_vars({("logging", "dag_processor_log_target"): "stdout"})
+    @mock.patch('airflow.dag_processing.processor.settings.dispose_orm', MagicMock)
+    @mock.patch('airflow.dag_processing.processor.redirect_stdout')
+    def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for_file):
+        processor = DagFileProcessorProcess(
+            file_path='abc.txt',
+            pickle_dags=False,
+            dag_ids=[],
+            dag_directory=[],
+            callback_requests=[],
+        )
+        processor._run_file_processor(
+            result_channel=MagicMock(),
+            parent_channel=MagicMock(),
+            file_path="fake_file_path",
+            pickle_dags=False,
+            dag_ids=[],
+            thread_name="fake_thread_name",
+            callback_requests=[],
+            dag_directory=[],
+        )
+        mock_redirect_stdout_for_file.assert_not_called()
+
+    @conf_vars({("logging", "dag_processor_log_target"): "file"})
+    @mock.patch('airflow.dag_processing.processor.settings.dispose_orm', MagicMock)
+    @mock.patch('airflow.dag_processing.processor.redirect_stdout')
+    def test_dag_parser_output_when_logging_to_file(self, mock_redirect_stdout_for_file):
+        processor = DagFileProcessorProcess(
+            file_path='abc.txt',
+            pickle_dags=False,
+            dag_ids=[],
+            dag_directory=[],
+            callback_requests=[],
+        )
+        processor._run_file_processor(
+            result_channel=MagicMock(),
+            parent_channel=MagicMock(),
+            file_path="fake_file_path",
+            pickle_dags=False,
+            dag_ids=[],
+            thread_name="fake_thread_name",
+            callback_requests=[],
+            dag_directory=[],
+        )
+        mock_redirect_stdout_for_file.assert_called_once()
+
 
 class TestProcessorAgent:
     @pytest.fixture(autouse=True)