You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/29 13:21:43 UTC

[airflow] 37/37: Fix logging issue when running tasks (#9363)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 845bcbefa57b6ed12be337128073e1d766910030
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu Jun 25 13:32:12 2020 +0100

    Fix logging issue when running tasks (#9363)
    
    (cherry picked from commit 61f4e9e82a0ac0680971eb6298f089a16c32ceb9)
---
 airflow/bin/cli.py                               | 31 +++++++-
 airflow/settings.py                              |  7 ++
 airflow/task/task_runner/standard_task_runner.py |  6 +-
 tests/cli/test_cli.py                            | 93 ++++++++++++++++++++++++
 tests/dags/test_logging_in_dag.py                | 44 +++++++++++
 5 files changed, 178 insertions(+), 3 deletions(-)

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index e70ccdd..b3ffea7 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -555,8 +555,35 @@ def run(args, dag=None):
     if args.interactive:
         _run(args, dag, ti)
     else:
-        with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
-            _run(args, dag, ti)
+        if settings.DONOT_MODIFY_HANDLERS:
+            with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
+                _run(args, dag, ti)
+        else:
+            # Get all the Handlers from 'airflow.task' logger
+            # Add these handlers to the root logger so that we can get logs from
+            # any custom loggers defined in the DAG
+            airflow_logger_handlers = logging.getLogger('airflow.task').handlers
+            root_logger = logging.getLogger()
+            root_logger_handlers = root_logger.handlers
+
+            # Remove all handlers from Root Logger to avoid duplicate logs
+            for handler in root_logger_handlers:
+                root_logger.removeHandler(handler)
+
+            for handler in airflow_logger_handlers:
+                root_logger.addHandler(handler)
+            root_logger.setLevel(logging.getLogger('airflow.task').level)
+
+            with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
+                _run(args, dag, ti)
+
+            # We need to restore the handlers to the loggers as celery worker process
+            # can call this command multiple times,
+            # so if we don't reset this then logs from next task would go to the wrong place
+            for handler in airflow_logger_handlers:
+                root_logger.removeHandler(handler)
+            for handler in root_logger_handlers:
+                root_logger.addHandler(handler)
     logging.shutdown()
 
 
diff --git a/airflow/settings.py b/airflow/settings.py
index bf2fb1d..513f192 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -400,3 +400,10 @@ STORE_SERIALIZED_DAGS = conf.getboolean('core', 'store_serialized_dags', fallbac
 # write rate.
 MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
     'core', 'min_serialized_dag_update_interval', fallback=30)
+
+# If donot_modify_handlers=True, we do not modify logging handlers in task_run command
+# If the flag is set to False, we remove all handlers from the root logger
+# and add all handlers from 'airflow.task' logger to the root Logger. This is done
+# to get all the logs from the print & log statements in the DAG files before a task is run
+# The handlers are restored after the task completes execution.
+DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False)
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 4557af2..8138cfa 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -17,6 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+"""Standard task runner"""
 import os
 
 import psutil
@@ -25,7 +26,7 @@ from setproctitle import setproctitle
 from airflow.task.task_runner.base_task_runner import BaseTaskRunner
 from airflow.utils.helpers import reap_process_group
 
-CAN_FORK = hasattr(os, 'fork')
+CAN_FORK = hasattr(os, "fork")
 
 
 class StandardTaskRunner(BaseTaskRunner):
@@ -73,6 +74,9 @@ class StandardTaskRunner(BaseTaskRunner):
             # [1:] - remove "airflow" from the start of the command
             args = parser.parse_args(self._command[1:])
 
+            self.log.info('Running: %s', self._command)
+            self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id)
+
             proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
             if hasattr(args, "job_id"):
                 proc_title += " {0.job_id}"
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index e7523dc..a2a81ac 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -22,6 +22,7 @@ import io
 import logging
 import os
 
+from airflow.configuration import conf
 from parameterized import parameterized
 from six import StringIO, PY2
 import sys
@@ -530,6 +531,98 @@ class TestCLI(unittest.TestCase):
         )
 
 
+class TestLogsfromTaskRunCommand(unittest.TestCase):
+
+    def setUp(self):
+        self.dag_id = "test_logging_dag"
+        self.task_id = "test_task"
+        reset(self.dag_id)
+        self.execution_date_str = timezone.make_aware(datetime(2017, 1, 1)).isoformat()
+        self.log_dir = conf.get('core', 'base_log_folder')
+        self.log_filename = "{}/{}/{}/1.log".format(self.dag_id, self.task_id, self.execution_date_str)
+        self.ti_log_file_path = os.path.join(self.log_dir, self.log_filename)
+        self.parser = cli.CLIFactory.get_parser()
+        try:
+            os.remove(self.ti_log_file_path)
+        except OSError:
+            pass
+
+    def tearDown(self):
+        reset(self.dag_id)
+        try:
+            os.remove(self.ti_log_file_path)
+        except OSError:
+            pass
+
+    def assert_log_line(self, text, logs_list, expect_from_logging_mixin=False):
+        """
+        Get Log Line and assert only 1 Entry exists with the given text. Also check that
+        "logging_mixin" line does not appear in that log line to avoid duplicate loggigng as below:
+        [2020-06-24 16:47:23,537] {logging_mixin.py:91} INFO - [2020-06-24 16:47:23,536] {python.py:135}
+        """
+        log_lines = [log for log in logs_list if text in log]
+        self.assertEqual(len(log_lines), 1)
+        log_line = log_lines[0]
+        if not expect_from_logging_mixin:
+            # Logs from print statement still show with logging_mixing as filename
+            # Example: [2020-06-24 17:07:00,482] {logging_mixin.py:91} INFO - Log from Print statement
+            self.assertNotIn("logging_mixin.py", log_line)
+        return log_line
+
+    @unittest.skipIf(not hasattr(os, 'fork'), "Forking not available")
+    def test_logging_with_run_task(self):
+        #  We are not using self.assertLogs as we want to verify what actually is stored in the Log file
+        # as that is what gets displayed
+
+        with conf_vars({('core', 'dags_folder'): os.path.join(TEST_DAG_FOLDER, self.dag_id)}):
+            cli.run(self.parser.parse_args([
+                'run', self.dag_id, self.task_id, '--local', self.execution_date_str]))
+
+        with open(self.ti_log_file_path) as l_file:
+            logs = l_file.read()
+
+        print(logs)     # In case of a test failures this line would show detailed log
+        logs_list = logs.splitlines()
+
+        self.assertIn("INFO - Started process", logs)
+        self.assertIn("Subtask {}".format(self.task_id), logs)
+        self.assertIn("standard_task_runner.py", logs)
+        self.assertIn("INFO - Running: ['airflow', 'run', '{}', "
+                      "'{}', '{}',".format(self.dag_id, self.task_id, self.execution_date_str), logs)
+
+        self.assert_log_line("Log from DAG Logger", logs_list)
+        self.assert_log_line("Log from TI Logger", logs_list)
+        self.assert_log_line("Log from Print statement", logs_list, expect_from_logging_mixin=True)
+
+        self.assertIn("INFO - Marking task as SUCCESS.dag_id={}, task_id={}, "
+                      "execution_date=20170101T000000".format(self.dag_id, self.task_id), logs)
+
+    @mock.patch("airflow.task.task_runner.standard_task_runner.CAN_FORK", False)
+    def test_logging_with_run_task_subprocess(self):
+        # We are not using self.assertLogs as we want to verify what actually is stored in the Log file
+        # as that is what gets displayed
+        with conf_vars({('core', 'dags_folder'): os.path.join(TEST_DAG_FOLDER, self.dag_id)}):
+            cli.run(self.parser.parse_args([
+                'run', self.dag_id, self.task_id, '--local', self.execution_date_str]))
+
+        with open(self.ti_log_file_path) as l_file:
+            logs = l_file.read()
+
+        print(logs)     # In case of a test failures this line would show detailed log
+        logs_list = logs.splitlines()
+
+        self.assertIn("Subtask {}".format(self.task_id), logs)
+        self.assertIn("base_task_runner.py", logs)
+        self.assert_log_line("Log from DAG Logger", logs_list)
+        self.assert_log_line("Log from TI Logger", logs_list)
+        self.assert_log_line("Log from Print statement", logs_list, expect_from_logging_mixin=True)
+
+        self.assertIn("INFO - Running: ['airflow', 'run', '{}', "
+                      "'{}', '{}',".format(self.dag_id, self.task_id, self.execution_date_str), logs)
+        self.assertIn("INFO - Marking task as SUCCESS.dag_id={}, task_id={}, "
+                      "execution_date=20170101T000000".format(self.dag_id, self.task_id), logs)
+
+
 @pytest.mark.integration("redis")
 @pytest.mark.integration("rabbitmq")
 class TestWorkerServeLogs(unittest.TestCase):
diff --git a/tests/dags/test_logging_in_dag.py b/tests/dags/test_logging_in_dag.py
new file mode 100644
index 0000000..0659562
--- /dev/null
+++ b/tests/dags/test_logging_in_dag.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from airflow.models import DAG
+from airflow.operators.python_operator import PythonOperator
+from airflow.utils.timezone import datetime
+
+logger = logging.getLogger(__name__)
+
+
+def test_logging_fn(**kwargs):
+    logger.info("Log from DAG Logger")
+    kwargs["ti"].log.info("Log from TI Logger")
+    print("Log from Print statement")
+
+
+dag = DAG(
+    dag_id='test_logging_dag',
+    schedule_interval=None,
+    start_date=datetime(2016, 1, 1)
+)
+
+PythonOperator(
+    task_id='test_task',
+    python_callable=test_logging_fn,
+    provide_context=True,
+    dag=dag,
+)