You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/29 13:21:43 UTC
[airflow] 37/37: Fix logging issue when running tasks (#9363)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 845bcbefa57b6ed12be337128073e1d766910030
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Thu Jun 25 13:32:12 2020 +0100
Fix logging issue when running tasks (#9363)
(cherry picked from commit 61f4e9e82a0ac0680971eb6298f089a16c32ceb9)
---
airflow/bin/cli.py | 31 +++++++-
airflow/settings.py | 7 ++
airflow/task/task_runner/standard_task_runner.py | 6 +-
tests/cli/test_cli.py | 93 ++++++++++++++++++++++++
tests/dags/test_logging_in_dag.py | 44 +++++++++++
5 files changed, 178 insertions(+), 3 deletions(-)
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index e70ccdd..b3ffea7 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -555,8 +555,35 @@ def run(args, dag=None):
if args.interactive:
_run(args, dag, ti)
else:
- with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
- _run(args, dag, ti)
+ if settings.DONOT_MODIFY_HANDLERS:
+ with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
+ _run(args, dag, ti)
+ else:
+ # Get all the Handlers from 'airflow.task' logger
+ # Add these handlers to the root logger so that we can get logs from
+ # any custom loggers defined in the DAG
+ airflow_logger_handlers = logging.getLogger('airflow.task').handlers
+ root_logger = logging.getLogger()
+ root_logger_handlers = root_logger.handlers
+
+ # Remove all handlers from Root Logger to avoid duplicate logs
+ for handler in root_logger_handlers:
+ root_logger.removeHandler(handler)
+
+ for handler in airflow_logger_handlers:
+ root_logger.addHandler(handler)
+ root_logger.setLevel(logging.getLogger('airflow.task').level)
+
+ with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN):
+ _run(args, dag, ti)
+
+ # We need to restore the handlers to the loggers as celery worker process
+ # can call this command multiple times,
+ # so if we don't reset this then logs from next task would go to the wrong place
+ for handler in airflow_logger_handlers:
+ root_logger.removeHandler(handler)
+ for handler in root_logger_handlers:
+ root_logger.addHandler(handler)
logging.shutdown()
diff --git a/airflow/settings.py b/airflow/settings.py
index bf2fb1d..513f192 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -400,3 +400,10 @@ STORE_SERIALIZED_DAGS = conf.getboolean('core', 'store_serialized_dags', fallbac
# write rate.
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
'core', 'min_serialized_dag_update_interval', fallback=30)
+
+# If donot_modify_handlers=True, we do not modify logging handlers in task_run command
+# If the flag is set to False, we remove all handlers from the root logger
+# and add all handlers from 'airflow.task' logger to the root Logger. This is done
+# to get all the logs from the print & log statements in the DAG files before a task is run
+# The handlers are restored after the task completes execution.
+DONOT_MODIFY_HANDLERS = conf.getboolean('logging', 'donot_modify_handlers', fallback=False)
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 4557af2..8138cfa 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.
+"""Standard task runner"""
import os
import psutil
@@ -25,7 +26,7 @@ from setproctitle import setproctitle
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
from airflow.utils.helpers import reap_process_group
-CAN_FORK = hasattr(os, 'fork')
+CAN_FORK = hasattr(os, "fork")
class StandardTaskRunner(BaseTaskRunner):
@@ -73,6 +74,9 @@ class StandardTaskRunner(BaseTaskRunner):
# [1:] - remove "airflow" from the start of the command
args = parser.parse_args(self._command[1:])
+ self.log.info('Running: %s', self._command)
+ self.log.info('Job %s: Subtask %s', self._task_instance.job_id, self._task_instance.task_id)
+
proc_title = "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date}"
if hasattr(args, "job_id"):
proc_title += " {0.job_id}"
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index e7523dc..a2a81ac 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -22,6 +22,7 @@ import io
import logging
import os
+from airflow.configuration import conf
from parameterized import parameterized
from six import StringIO, PY2
import sys
@@ -530,6 +531,98 @@ class TestCLI(unittest.TestCase):
)
+class TestLogsfromTaskRunCommand(unittest.TestCase):
+
+ def setUp(self):
+ self.dag_id = "test_logging_dag"
+ self.task_id = "test_task"
+ reset(self.dag_id)
+ self.execution_date_str = timezone.make_aware(datetime(2017, 1, 1)).isoformat()
+ self.log_dir = conf.get('core', 'base_log_folder')
+ self.log_filename = "{}/{}/{}/1.log".format(self.dag_id, self.task_id, self.execution_date_str)
+ self.ti_log_file_path = os.path.join(self.log_dir, self.log_filename)
+ self.parser = cli.CLIFactory.get_parser()
+ try:
+ os.remove(self.ti_log_file_path)
+ except OSError:
+ pass
+
+ def tearDown(self):
+ reset(self.dag_id)
+ try:
+ os.remove(self.ti_log_file_path)
+ except OSError:
+ pass
+
+ def assert_log_line(self, text, logs_list, expect_from_logging_mixin=False):
+ """
+ Get Log Line and assert only 1 Entry exists with the given text. Also check that
+ "logging_mixin" line does not appear in that log line to avoid duplicate loggigng as below:
+ [2020-06-24 16:47:23,537] {logging_mixin.py:91} INFO - [2020-06-24 16:47:23,536] {python.py:135}
+ """
+ log_lines = [log for log in logs_list if text in log]
+ self.assertEqual(len(log_lines), 1)
+ log_line = log_lines[0]
+ if not expect_from_logging_mixin:
+ # Logs from print statement still show with logging_mixing as filename
+ # Example: [2020-06-24 17:07:00,482] {logging_mixin.py:91} INFO - Log from Print statement
+ self.assertNotIn("logging_mixin.py", log_line)
+ return log_line
+
+ @unittest.skipIf(not hasattr(os, 'fork'), "Forking not available")
+ def test_logging_with_run_task(self):
+ # We are not using self.assertLogs as we want to verify what actually is stored in the Log file
+ # as that is what gets displayed
+
+ with conf_vars({('core', 'dags_folder'): os.path.join(TEST_DAG_FOLDER, self.dag_id)}):
+ cli.run(self.parser.parse_args([
+ 'run', self.dag_id, self.task_id, '--local', self.execution_date_str]))
+
+ with open(self.ti_log_file_path) as l_file:
+ logs = l_file.read()
+
+ print(logs) # In case of a test failures this line would show detailed log
+ logs_list = logs.splitlines()
+
+ self.assertIn("INFO - Started process", logs)
+ self.assertIn("Subtask {}".format(self.task_id), logs)
+ self.assertIn("standard_task_runner.py", logs)
+ self.assertIn("INFO - Running: ['airflow', 'run', '{}', "
+ "'{}', '{}',".format(self.dag_id, self.task_id, self.execution_date_str), logs)
+
+ self.assert_log_line("Log from DAG Logger", logs_list)
+ self.assert_log_line("Log from TI Logger", logs_list)
+ self.assert_log_line("Log from Print statement", logs_list, expect_from_logging_mixin=True)
+
+ self.assertIn("INFO - Marking task as SUCCESS.dag_id={}, task_id={}, "
+ "execution_date=20170101T000000".format(self.dag_id, self.task_id), logs)
+
+ @mock.patch("airflow.task.task_runner.standard_task_runner.CAN_FORK", False)
+ def test_logging_with_run_task_subprocess(self):
+ # We are not using self.assertLogs as we want to verify what actually is stored in the Log file
+ # as that is what gets displayed
+ with conf_vars({('core', 'dags_folder'): os.path.join(TEST_DAG_FOLDER, self.dag_id)}):
+ cli.run(self.parser.parse_args([
+ 'run', self.dag_id, self.task_id, '--local', self.execution_date_str]))
+
+ with open(self.ti_log_file_path) as l_file:
+ logs = l_file.read()
+
+ print(logs) # In case of a test failures this line would show detailed log
+ logs_list = logs.splitlines()
+
+ self.assertIn("Subtask {}".format(self.task_id), logs)
+ self.assertIn("base_task_runner.py", logs)
+ self.assert_log_line("Log from DAG Logger", logs_list)
+ self.assert_log_line("Log from TI Logger", logs_list)
+ self.assert_log_line("Log from Print statement", logs_list, expect_from_logging_mixin=True)
+
+ self.assertIn("INFO - Running: ['airflow', 'run', '{}', "
+ "'{}', '{}',".format(self.dag_id, self.task_id, self.execution_date_str), logs)
+ self.assertIn("INFO - Marking task as SUCCESS.dag_id={}, task_id={}, "
+ "execution_date=20170101T000000".format(self.dag_id, self.task_id), logs)
+
+
@pytest.mark.integration("redis")
@pytest.mark.integration("rabbitmq")
class TestWorkerServeLogs(unittest.TestCase):
diff --git a/tests/dags/test_logging_in_dag.py b/tests/dags/test_logging_in_dag.py
new file mode 100644
index 0000000..0659562
--- /dev/null
+++ b/tests/dags/test_logging_in_dag.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+from airflow.models import DAG
+from airflow.operators.python_operator import PythonOperator
+from airflow.utils.timezone import datetime
+
+logger = logging.getLogger(__name__)
+
+
+def test_logging_fn(**kwargs):
+ logger.info("Log from DAG Logger")
+ kwargs["ti"].log.info("Log from TI Logger")
+ print("Log from Print statement")
+
+
+dag = DAG(
+ dag_id='test_logging_dag',
+ schedule_interval=None,
+ start_date=datetime(2016, 1, 1)
+)
+
+PythonOperator(
+ task_id='test_task',
+ python_callable=test_logging_fn,
+ provide_context=True,
+ dag=dag,
+)