You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/06/07 00:14:05 UTC

[airflow] branch main updated: Apply per-run log templates to log handlers (#24153)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c23826915d Apply per-run log templates to log handlers (#24153)
c23826915d is described below

commit c23826915dcdca4f22b52b74633336cb2f4a1eca
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Jun 7 08:13:55 2022 +0800

    Apply per-run log templates to log handlers (#24153)
---
 airflow/config_templates/airflow_local_settings.py |  1 -
 airflow/config_templates/default_test.cfg          |  1 -
 airflow/models/dagrun.py                           | 15 ++++-
 .../alibaba/cloud/log/oss_task_handler.py          |  2 +-
 .../amazon/aws/log/cloudwatch_task_handler.py      |  3 +-
 .../providers/amazon/aws/log/s3_task_handler.py    |  3 +-
 .../providers/elasticsearch/log/es_task_handler.py | 33 +++++++++--
 .../providers/google/cloud/log/gcs_task_handler.py |  2 +-
 .../microsoft/azure/log/wasb_task_handler.py       |  3 +-
 airflow/utils/log/file_task_handler.py             | 26 ++++++---
 airflow/utils/log/log_reader.py                    |  2 +-
 tests/api_connexion/endpoints/test_log_endpoint.py |  8 +--
 tests/conftest.py                                  | 21 +++++++
 .../alibaba/cloud/log/test_oss_task_handler.py     |  5 +-
 .../amazon/aws/log/test_cloudwatch_task_handler.py | 37 ++++++++----
 .../amazon/aws/log/test_s3_task_handler.py         | 43 +++++++++-----
 .../elasticsearch/log/test_es_task_handler.py      | 68 ++++++++++------------
 .../google/cloud/log/test_gcs_task_handler.py      |  8 +--
 .../microsoft/azure/log/test_wasb_task_handler.py  | 41 ++++++-------
 tests/task/task_runner/test_task_runner.py         |  1 +
 tests/utils/log/test_log_reader.py                 | 12 +++-
 tests/utils/test_log_handlers.py                   | 47 +++++++--------
 .../task_for_testing_log_view/1.log                |  1 +
 .../attempt=1.log                                  |  1 +
 tests/www/views/test_views_log.py                  |  3 -
 25 files changed, 232 insertions(+), 155 deletions(-)

diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index b2752c2be7..6684fd18e5 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -82,7 +82,6 @@ DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
             'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
             'formatter': 'airflow',
             'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-            'filename_template': FILENAME_TEMPLATE,
             'filters': ['mask_secrets'],
         },
         'processor': {
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index 2f9b6fa264..83260d0d52 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -54,7 +54,6 @@ base_log_folder = {AIRFLOW_HOME}/logs
 logging_level = INFO
 celery_logging_level = WARN
 fab_logging_level = WARN
-log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
 log_processor_filename_template = {{{{ filename }}}}.log
 dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log
 worker_log_server_port = 8793
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index eeec4d5b09..c66e24c536 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1065,14 +1065,23 @@ class DagRun(Base, LoggingMixin):
         return count
 
     @provide_session
-    def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str:
+    def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate:
         if self.log_template_id is None:  # DagRun created before LogTemplate introduction.
-            template = session.query(LogTemplate.filename).order_by(LogTemplate.id).limit(1).scalar()
+            template = session.query(LogTemplate).order_by(LogTemplate.id).first()
         else:
-            template = session.query(LogTemplate.filename).filter_by(id=self.log_template_id).scalar()
+            template = session.query(LogTemplate).get(self.log_template_id)
         if template is None:
             raise AirflowException(
                 f"No log_template entry found for ID {self.log_template_id!r}. "
                 f"Please make sure you set up the metadatabase correctly."
             )
         return template
+
+    @provide_session
+    def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str:
+        warnings.warn(
+            "This method is deprecated. Please use get_log_template instead.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        return self.get_log_template(session=session).filename
diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index bb1d801ac9..393847f93a 100644
--- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -38,7 +38,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
     uploads to and reads from OSS remote storage.
     """
 
-    def __init__(self, base_log_folder, oss_log_folder, filename_template):
+    def __init__(self, base_log_folder, oss_log_folder, filename_template=None):
         self.log.info("Using oss_task_handler for remote logging...")
         super().__init__(base_log_folder, filename_template)
         (self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder)
diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index c975a2cb83..7d4f81006b 100644
--- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -17,6 +17,7 @@
 # under the License.
 import sys
 from datetime import datetime
+from typing import Optional
 
 import watchtower
 
@@ -42,7 +43,7 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
     :param filename_template: template for file name (local storage) or log stream name (remote)
     """
 
-    def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: str):
+    def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: Optional[str] = None):
         super().__init__(base_log_folder, filename_template)
         split_arn = log_group_arn.split(':')
 
diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py
index ce3da88f16..cb8e0a2883 100644
--- a/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -18,6 +18,7 @@
 import os
 import pathlib
 import sys
+from typing import Optional
 
 if sys.version_info >= (3, 8):
     from functools import cached_property
@@ -36,7 +37,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
     uploads to and reads from S3 remote storage.
     """
 
-    def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: str):
+    def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: Optional[str] = None):
         super().__init__(base_log_folder, filename_template)
         self.remote_base = s3_log_folder
         self.log_relative_path = ''
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index 83c1163d80..64fce0df53 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -18,6 +18,7 @@
 
 import logging
 import sys
+import warnings
 from collections import defaultdict
 from datetime import datetime
 from operator import attrgetter
@@ -31,15 +32,22 @@ import pendulum
 from elasticsearch_dsl import Search
 
 from airflow.configuration import conf
-from airflow.models import TaskInstance
+from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.json_formatter import JSONFormatter
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+from airflow.utils.session import create_session
 
 # Elasticsearch hosted log type
 EsLogMsgType = List[Tuple[str, str]]
 
+# Compatibility: Airflow 2.3.3 and up uses this method, which accesses the
+# LogTemplate model to record the log ID template used. If this function does
+# not exist, the task handler should use the log_id_template attribute instead.
+USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
+
 
 class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
     """
@@ -65,8 +73,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
     def __init__(
         self,
         base_log_folder: str,
-        filename_template: str,
-        log_id_template: str,
         end_of_log_mark: str,
         write_stdout: bool,
         json_format: bool,
@@ -76,6 +82,9 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         host: str = "localhost:9200",
         frontend: str = "localhost:5601",
         es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"),
+        *,
+        filename_template: Optional[str] = None,
+        log_id_template: Optional[str] = None,
     ):
         """
         :param base_log_folder: base folder to store logs locally
@@ -88,7 +97,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
 
         self.client = elasticsearch.Elasticsearch([host], **es_kwargs)  # type: ignore[attr-defined]
 
-        self.log_id_template = log_id_template
+        if USE_PER_RUN_LOG_ID and log_id_template is not None:
+            warnings.warn(
+                "Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect",
+                DeprecationWarning,
+            )
+
+        self.log_id_template = log_id_template  # Only used on Airflow < 2.3.2.
         self.frontend = frontend
         self.mark_end_on_close = True
         self.end_of_log_mark = end_of_log_mark
@@ -103,7 +118,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         self.handler: Union[logging.FileHandler, logging.StreamHandler]  # type: ignore[assignment]
 
     def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
-        dag_run = ti.get_dagrun()
+        with create_session() as session:
+            dag_run = ti.get_dagrun(session=session)
+            if USE_PER_RUN_LOG_ID:
+                log_id_template = dag_run.get_log_template(session=session).elasticsearch_id
+            else:
+                log_id_template = self.log_id_template
+
         dag = ti.task.dag
         assert dag is not None  # For Mypy.
         try:
@@ -126,7 +147,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
                 data_interval_end = ""
             execution_date = dag_run.execution_date.isoformat()
 
-        return self.log_id_template.format(
+        return log_id_template.format(
             dag_id=ti.dag_id,
             task_id=ti.task_id,
             run_id=getattr(ti, "run_id", ""),
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py
index 92d133d109..81f1426d75 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -67,7 +67,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         *,
         base_log_folder: str,
         gcs_log_folder: str,
-        filename_template: str,
+        filename_template: Optional[str] = None,
         gcp_key_path: Optional[str] = None,
         gcp_keyfile_dict: Optional[dict] = None,
         gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS,
diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index 9ec0cdf646..f5e89c2c21 100644
--- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -44,8 +44,9 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
         base_log_folder: str,
         wasb_log_folder: str,
         wasb_container: str,
-        filename_template: str,
         delete_local_copy: str,
+        *,
+        filename_template: Optional[str] = None,
     ) -> None:
         super().__init__(base_log_folder, filename_template)
         self.wasb_container = wasb_container
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index e0561991b2..db34ea5f6b 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -18,6 +18,7 @@
 """File logging handler for tasks."""
 import logging
 import os
+import warnings
 from datetime import datetime
 from pathlib import Path
 from typing import TYPE_CHECKING, Optional, Tuple
@@ -28,6 +29,7 @@ from airflow.configuration import AirflowConfigException, conf
 from airflow.utils.context import Context
 from airflow.utils.helpers import parse_template_string, render_template_to_string
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
+from airflow.utils.session import create_session
 
 if TYPE_CHECKING:
     from airflow.models import TaskInstance
@@ -44,11 +46,15 @@ class FileTaskHandler(logging.Handler):
     :param filename_template: template filename string
     """
 
-    def __init__(self, base_log_folder: str, filename_template: str):
+    def __init__(self, base_log_folder: str, filename_template: Optional[str] = None):
         super().__init__()
         self.handler: Optional[logging.FileHandler] = None
         self.local_base = base_log_folder
-        self.filename_template, self.filename_jinja_template = parse_template_string(filename_template)
+        if filename_template is not None:
+            warnings.warn(
+                "Passing filename_template to FileTaskHandler is deprecated and has no effect",
+                DeprecationWarning,
+            )
 
     def set_context(self, ti: "TaskInstance"):
         """
@@ -75,15 +81,19 @@ class FileTaskHandler(logging.Handler):
             self.handler.close()
 
     def _render_filename(self, ti: "TaskInstance", try_number: int) -> str:
-        if self.filename_jinja_template:
+        with create_session() as session:
+            dag_run = ti.get_dagrun(session=session)
+            template = dag_run.get_log_template(session=session).filename
+        str_tpl, jinja_tpl = parse_template_string(template)
+
+        if jinja_tpl:
             if hasattr(ti, "task"):
                 context = ti.get_template_context()
             else:
-                context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat())
+                context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
             context["try_number"] = try_number
-            return render_template_to_string(self.filename_jinja_template, context)
-        elif self.filename_template:
-            dag_run = ti.get_dagrun()
+            return render_template_to_string(jinja_tpl, context)
+        elif str_tpl:
             dag = ti.task.dag
             assert dag is not None  # For Mypy.
             try:
@@ -98,7 +108,7 @@ class FileTaskHandler(logging.Handler):
                 data_interval_end = data_interval[1].isoformat()
             else:
                 data_interval_end = ""
-            return self.filename_template.format(
+            return str_tpl.format(
                 dag_id=ti.dag_id,
                 task_id=ti.task_id,
                 run_id=ti.run_id,
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index 396ab90a32..f241c22df1 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -121,6 +121,6 @@ class TaskLogReader:
         attachment_filename = render_log_filename(
             ti=ti,
             try_number="all" if try_number is None else try_number,
-            filename_template=dagrun.get_log_filename_template(session=session),
+            filename_template=dagrun.get_log_template(session=session).filename,
         )
         return attachment_filename
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py
index 614e1fa3a1..1b226be96f 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -99,7 +99,7 @@ class TestGetLog:
         self.ti.hostname = 'localhost'
 
     @pytest.fixture
-    def configure_loggers(self, tmp_path):
+    def configure_loggers(self, tmp_path, create_log_template):
         self.log_dir = tmp_path
 
         dir_path = tmp_path / self.DAG_ID / self.TASK_ID / self.default_time.replace(':', '.')
@@ -112,9 +112,9 @@ class TestGetLog:
         logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
         logging_config['handlers']['task']['base_log_folder'] = self.log_dir
 
-        logging_config['handlers']['task'][
-            'filename_template'
-        ] = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+        create_log_template(
+            '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+        )
 
         logging.config.dictConfig(logging_config)
 
diff --git a/tests/conftest.py b/tests/conftest.py
index 1328ea01d5..b20c0be494 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -34,6 +34,9 @@ os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
 os.environ["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION") or "us-east-1"
 os.environ["CREDENTIALS_DIR"] = os.environ.get('CREDENTIALS_DIR') or "/files/airflow-breeze-config/keys"
 
+from airflow import settings  # noqa: E402
+from airflow.models.tasklog import LogTemplate  # noqa: E402
+
 from tests.test_utils.perf.perf_kit.sqlalchemy import (  # noqa isort:skip
     count_queries,
     trace_queries,
@@ -769,3 +772,21 @@ def get_test_dag():
         return dag
 
     return _get
+
+
+@pytest.fixture()
+def create_log_template(request):
+    session = settings.Session()
+
+    def _create_log_template(filename_template, elasticsearch_id=""):
+        log_template = LogTemplate(filename=filename_template, elasticsearch_id=elasticsearch_id)
+        session.add(log_template)
+        session.commit()
+
+        def _delete_log_template():
+            session.delete(log_template)
+            session.commit()
+
+        request.addfinalizer(_delete_log_template)
+
+    return _create_log_template
diff --git a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py
index 24eb73b92e..30e8cc32b9 100644
--- a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py
+++ b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py
@@ -35,10 +35,7 @@ class TestOSSTaskHandler(unittest.TestCase):
     def setUp(self):
         self.base_log_folder = 'local/airflow/logs/1.log'
         self.oss_log_folder = f'oss://{MOCK_BUCKET_NAME}/airflow/logs'
-        self.filename_template = '{try_number}.log'
-        self.oss_task_handler = OSSTaskHandler(
-            self.base_log_folder, self.oss_log_folder, self.filename_template
-        )
+        self.oss_task_handler = OSSTaskHandler(self.base_log_folder, self.oss_log_folder)
 
     @mock.patch(OSS_TASK_HANDLER_STRING.format('conf.get'))
     @mock.patch(OSS_TASK_HANDLER_STRING.format('OSSHook'))
diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
index dbd2ae28d5..8b23218c8c 100644
--- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -16,17 +16,18 @@
 # specific language governing permissions and limitations
 # under the License.
 import time
-import unittest
 from datetime import datetime as dt
 from unittest import mock
 from unittest.mock import ANY, call
 
+import pytest
 from watchtower import CloudWatchLogHandler
 
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
 from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudwatchTaskHandler
+from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.timezone import datetime
 from tests.test_utils.config import conf_vars
@@ -44,19 +45,24 @@ def get_time_str(time_in_milliseconds):
     return dt_time.strftime("%Y-%m-%d %H:%M:%S,000")
 
 
-@unittest.skipIf(mock_logs is None, "Skipping test because moto.mock_logs is not available")
-@mock_logs
-class TestCloudwatchTaskHandler(unittest.TestCase):
+@pytest.fixture(autouse=True, scope="module")
+def logmock():
+    with mock_logs():
+        yield
+
+
+@pytest.mark.skipif(mock_logs is None, reason="Skipping test because moto.mock_logs is not available")
+class TestCloudwatchTaskHandler:
     @conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'})
-    def setUp(self):
+    @pytest.fixture(autouse=True)
+    def setup(self, create_log_template):
         self.remote_log_group = 'log_group_name'
         self.region_name = 'us-west-2'
         self.local_log_location = 'local/log/location'
-        self.filename_template = '{dag_id}/{task_id}/{execution_date}/{try_number}.log'
+        create_log_template('{dag_id}/{task_id}/{execution_date}/{try_number}.log')
         self.cloudwatch_task_handler = CloudwatchTaskHandler(
             self.local_log_location,
             f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
-            self.filename_template,
         )
         self.cloudwatch_task_handler.hook
 
@@ -65,21 +71,29 @@ class TestCloudwatchTaskHandler(unittest.TestCase):
         task_id = 'task_for_testing_cloudwatch_log_handler'
         self.dag = DAG(dag_id=dag_id, start_date=date)
         task = EmptyOperator(task_id=task_id, dag=self.dag)
-        dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test")
-        self.ti = TaskInstance(task=task)
+        dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="scheduled")
+        with create_session() as session:
+            session.add(dag_run)
+            session.commit()
+            session.refresh(dag_run)
+
+        self.ti = TaskInstance(task=task, run_id=dag_run.run_id)
         self.ti.dag_run = dag_run
         self.ti.try_number = 1
         self.ti.state = State.RUNNING
 
-        self.remote_log_stream = f'{dag_id}/{task_id}/{date.isoformat()}/{self.ti.try_number}.log'.replace(
+        self.remote_log_stream = (f'{dag_id}/{task_id}/{date.isoformat()}/{self.ti.try_number}.log').replace(
             ':', '_'
         )
 
         moto.moto_api._internal.models.moto_api_backend.reset()
         self.conn = boto3.client('logs', region_name=self.region_name)
 
-    def tearDown(self):
+        yield
+
         self.cloudwatch_task_handler.handler = None
+        with create_session() as session:
+            session.query(DagRun).delete()
 
     def test_hook(self):
         assert isinstance(self.cloudwatch_task_handler.hook, AwsLogsHook)
@@ -89,7 +103,6 @@ class TestCloudwatchTaskHandler(unittest.TestCase):
         handler = CloudwatchTaskHandler(
             self.local_log_location,
             f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
-            self.filename_template,
         )
 
         with mock.patch.object(handler.log, 'error') as mock_error:
diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py b/tests/providers/amazon/aws/log/test_s3_task_handler.py
index b647f95ba4..958ded562e 100644
--- a/tests/providers/amazon/aws/log/test_s3_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py
@@ -18,7 +18,6 @@
 
 import contextlib
 import os
-import unittest
 from unittest import mock
 from unittest.mock import ANY
 
@@ -29,6 +28,7 @@ from airflow.models import DAG, DagRun, TaskInstance
 from airflow.operators.empty import EmptyOperator
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler
+from airflow.utils.session import create_session
 from airflow.utils.state import State
 from airflow.utils.timezone import datetime
 from tests.test_utils.config import conf_vars
@@ -41,32 +41,39 @@ except ImportError:
     mock_s3 = None
 
 
-@unittest.skipIf(mock_s3 is None, "Skipping test because moto.mock_s3 is not available")
-@mock_s3
-class TestS3TaskHandler(unittest.TestCase):
+@pytest.fixture(autouse=True, scope="module")
+def s3mock():
+    with mock_s3():
+        yield
+
+
+@pytest.mark.skipif(mock_s3 is None, reason="Skipping test because moto.mock_s3 is not available")
+class TestS3TaskHandler:
     @conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'})
-    def setUp(self):
-        super().setUp()
+    @pytest.fixture(autouse=True)
+    def setup(self, create_log_template):
         self.remote_log_base = 's3://bucket/remote/log/location'
         self.remote_log_location = 's3://bucket/remote/log/location/1.log'
         self.remote_log_key = 'remote/log/location/1.log'
         self.local_log_location = 'local/log/location'
-        self.filename_template = '{try_number}.log'
-        self.s3_task_handler = S3TaskHandler(
-            self.local_log_location, self.remote_log_base, self.filename_template
-        )
+        create_log_template('{try_number}.log')
+        self.s3_task_handler = S3TaskHandler(self.local_log_location, self.remote_log_base)
         # Vivfy the hook now with the config override
         assert self.s3_task_handler.hook is not None
 
         date = datetime(2016, 1, 1)
         self.dag = DAG('dag_for_testing_s3_task_handler', start_date=date)
         task = EmptyOperator(task_id='task_for_testing_s3_log_handler', dag=self.dag)
-        dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test")
-        self.ti = TaskInstance(task=task)
+        dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="manual")
+        with create_session() as session:
+            session.add(dag_run)
+            session.commit()
+            session.refresh(dag_run)
+
+        self.ti = TaskInstance(task=task, run_id=dag_run.run_id)
         self.ti.dag_run = dag_run
         self.ti.try_number = 1
         self.ti.state = State.RUNNING
-        self.addCleanup(self.dag.clear)
 
         self.conn = boto3.client('s3')
         # We need to create the bucket since this is all in Moto's 'virtual'
@@ -74,7 +81,13 @@ class TestS3TaskHandler(unittest.TestCase):
         moto.moto_api._internal.models.moto_api_backend.reset()
         self.conn.create_bucket(Bucket="bucket")
 
-    def tearDown(self):
+        yield
+
+        self.dag.clear()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+
         if self.s3_task_handler.handler:
             with contextlib.suppress(Exception):
                 os.remove(self.s3_task_handler.handler.baseFilename)
@@ -85,7 +98,7 @@ class TestS3TaskHandler(unittest.TestCase):
 
     @conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'})
     def test_hook_raises(self):
-        handler = S3TaskHandler(self.local_log_location, self.remote_log_base, self.filename_template)
+        handler = S3TaskHandler(self.local_log_location, self.remote_log_base)
         with mock.patch.object(handler.log, 'error') as mock_error:
             with mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook") as mock_hook:
                 mock_hook.side_effect = Exception('Failed to connect')
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index 03eab3dbb7..e26a78fe77 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -58,9 +58,11 @@ class TestElasticsearchTaskHandler:
     EXECUTION_DATE = datetime(2016, 1, 1)
     LOG_ID = f'{DAG_ID}-{TASK_ID}-2016-01-01T00:00:00+00:00-1'
     JSON_LOG_ID = f'{DAG_ID}-{TASK_ID}-{ElasticsearchTaskHandler._clean_date(EXECUTION_DATE)}-1'
+    FILENAME_TEMPLATE = '{try_number}.log'
 
     @pytest.fixture()
-    def ti(self, create_task_instance):
+    def ti(self, create_task_instance, create_log_template):
+        create_log_template(self.FILENAME_TEMPLATE, '{dag_id}-{task_id}-{execution_date}-{try_number}')
         yield get_ti(
             dag_id=self.DAG_ID,
             task_id=self.TASK_ID,
@@ -73,8 +75,6 @@ class TestElasticsearchTaskHandler:
     @elasticmock
     def setup(self):
         self.local_log_location = 'local/log/location'
-        self.filename_template = '{try_number}.log'
-        self.log_id_template = '{dag_id}-{task_id}-{execution_date}-{try_number}'
         self.end_of_log_mark = 'end_of_log\n'
         self.write_stdout = False
         self.json_format = False
@@ -82,15 +82,13 @@ class TestElasticsearchTaskHandler:
         self.host_field = 'host'
         self.offset_field = 'offset'
         self.es_task_handler = ElasticsearchTaskHandler(
-            self.local_log_location,
-            self.filename_template,
-            self.log_id_template,
-            self.end_of_log_mark,
-            self.write_stdout,
-            self.json_format,
-            self.json_fields,
-            self.host_field,
-            self.offset_field,
+            base_log_folder=self.local_log_location,
+            end_of_log_mark=self.end_of_log_mark,
+            write_stdout=self.write_stdout,
+            json_format=self.json_format,
+            json_fields=self.json_fields,
+            host_field=self.host_field,
+            offset_field=self.offset_field,
         )
 
         self.es = elasticsearch.Elasticsearch(hosts=[{'host': 'localhost', 'port': 9200}])
@@ -115,15 +113,13 @@ class TestElasticsearchTaskHandler:
         assert es_conf == expected_dict
         # ensure creating with configs does not fail
         ElasticsearchTaskHandler(
-            self.local_log_location,
-            self.filename_template,
-            self.log_id_template,
-            self.end_of_log_mark,
-            self.write_stdout,
-            self.json_format,
-            self.json_fields,
-            self.host_field,
-            self.offset_field,
+            base_log_folder=self.local_log_location,
+            end_of_log_mark=self.end_of_log_mark,
+            write_stdout=self.write_stdout,
+            json_format=self.json_format,
+            json_fields=self.json_fields,
+            host_field=self.host_field,
+            offset_field=self.offset_field,
             es_kwargs=es_conf,
         )
 
@@ -395,7 +391,7 @@ class TestElasticsearchTaskHandler:
         self.es_task_handler.set_context(ti)
         self.es_task_handler.close()
         with open(
-            os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+            os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
         ) as log_file:
             # end_of_log_mark may contain characters like '\n' which is needed to
             # have the log uploaded but will not be stored in elasticsearch.
@@ -409,7 +405,7 @@ class TestElasticsearchTaskHandler:
         self.es_task_handler.set_context(ti)
         self.es_task_handler.close()
         with open(
-            os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+            os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
         ) as log_file:
             assert self.end_of_log_mark not in log_file.read()
         assert self.es_task_handler.closed
@@ -419,7 +415,7 @@ class TestElasticsearchTaskHandler:
         self.es_task_handler.set_context(ti)
         self.es_task_handler.close()
         with open(
-            os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+            os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
         ) as log_file:
             assert 0 == len(log_file.read())
 
@@ -428,7 +424,7 @@ class TestElasticsearchTaskHandler:
         self.es_task_handler.handler = None
         self.es_task_handler.close()
         with open(
-            os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+            os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
         ) as log_file:
             assert 0 == len(log_file.read())
         assert self.es_task_handler.closed
@@ -438,7 +434,7 @@ class TestElasticsearchTaskHandler:
         self.es_task_handler.handler.stream = None
         self.es_task_handler.close()
         with open(
-            os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+            os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
         ) as log_file:
             assert self.end_of_log_mark in log_file.read()
         assert self.es_task_handler.closed
@@ -447,7 +443,7 @@ class TestElasticsearchTaskHandler:
         self.es_task_handler.handler.stream.close()
         self.es_task_handler.close()
         with open(
-            os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+            os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
         ) as log_file:
             assert self.end_of_log_mark in log_file.read()
         assert self.es_task_handler.closed
@@ -478,15 +474,13 @@ class TestElasticsearchTaskHandler:
     )
     def test_get_external_log_url(self, ti, json_format, es_frontend, expected_url):
         es_task_handler = ElasticsearchTaskHandler(
-            self.local_log_location,
-            self.filename_template,
-            self.log_id_template,
-            self.end_of_log_mark,
-            self.write_stdout,
-            json_format,
-            self.json_fields,
-            self.host_field,
-            self.offset_field,
+            base_log_folder=self.local_log_location,
+            end_of_log_mark=self.end_of_log_mark,
+            write_stdout=self.write_stdout,
+            json_format=json_format,
+            json_fields=self.json_fields,
+            host_field=self.host_field,
+            offset_field=self.offset_field,
             frontend=es_frontend,
         )
         url = es_task_handler.get_external_log_url(ti, ti.try_number)
@@ -508,8 +502,6 @@ class TestElasticsearchTaskHandler:
         # arrange
         handler = ElasticsearchTaskHandler(
             base_log_folder=self.local_log_location,
-            filename_template=self.filename_template,
-            log_id_template=self.log_id_template,
             end_of_log_mark=self.end_of_log_mark,
             write_stdout=True,
             json_format=True,
diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py
index 6517be8f31..b443a9f8ec 100644
--- a/tests/providers/google/cloud/log/test_gcs_task_handler.py
+++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py
@@ -49,13 +49,11 @@ class TestGCSTaskHandler:
             yield td
 
     @pytest.fixture(autouse=True)
-    def gcs_task_handler(self, local_log_location):
-        self.remote_log_base = "gs://bucket/remote/log/location"
-        self.filename_template = "{try_number}.log"
+    def gcs_task_handler(self, create_log_template, local_log_location):
+        create_log_template("{try_number}.log")
         self.gcs_task_handler = GCSTaskHandler(
             base_log_folder=local_log_location,
-            gcs_log_folder=self.remote_log_base,
-            filename_template=self.filename_template,
+            gcs_log_folder="gs://bucket/remote/log/location",
         )
         yield self.gcs_task_handler
 
diff --git a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
index 4fe9676717..3c92aa78aa 100644
--- a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
+++ b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
@@ -22,23 +22,25 @@ from azure.common import AzureHttpError
 
 from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
 from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbTaskHandler
-from airflow.utils.state import State
+from airflow.utils.state import TaskInstanceState
 from airflow.utils.timezone import datetime
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_dags, clear_db_runs
 
+DEFAULT_DATE = datetime(2020, 8, 10)
+
 
 class TestWasbTaskHandler:
     @pytest.fixture(autouse=True)
-    def ti(self, create_task_instance):
-        date = datetime(2020, 8, 10)
+    def ti(self, create_task_instance, create_log_template):
+        create_log_template("{try_number}.log")
         ti = create_task_instance(
             dag_id='dag_for_testing_wasb_task_handler',
             task_id='task_for_testing_wasb_log_handler',
-            execution_date=date,
-            start_date=date,
-            dagrun_state=State.RUNNING,
-            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            dagrun_state=TaskInstanceState.RUNNING,
+            state=TaskInstanceState.RUNNING,
         )
         ti.try_number = 1
         ti.hostname = 'localhost'
@@ -52,12 +54,10 @@ class TestWasbTaskHandler:
         self.remote_log_location = 'remote/log/location/1.log'
         self.local_log_location = 'local/log/location'
         self.container_name = "wasb-container"
-        self.filename_template = '{try_number}.log'
         self.wasb_task_handler = WasbTaskHandler(
             base_log_folder=self.local_log_location,
             wasb_log_folder=self.wasb_log_folder,
             wasb_container=self.container_name,
-            filename_template=self.filename_template,
             delete_local_copy=True,
         )
 
@@ -68,9 +68,7 @@ class TestWasbTaskHandler:
 
     @conf_vars({('logging', 'remote_log_conn_id'): 'wasb_default'})
     def test_hook_raises(self):
-        handler = WasbTaskHandler(
-            self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True
-        )
+        handler = self.wasb_task_handler
         with mock.patch.object(handler.log, 'error') as mock_error:
             with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook:
                 mock_hook.side_effect = AzureHttpError("failed to connect", 404)
@@ -120,15 +118,14 @@ class TestWasbTaskHandler:
             [{'end_of_log': True}],
         )
 
-    def test_wasb_read_raises(self):
-        handler = WasbTaskHandler(
-            self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True
-        )
+    @mock.patch(
+        "airflow.providers.microsoft.azure.hooks.wasb.WasbHook",
+        **{"return_value.read_file.side_effect": AzureHttpError("failed to connect", 404)},
+    )
+    def test_wasb_read_raises(self, mock_hook):
+        handler = self.wasb_task_handler
         with mock.patch.object(handler.log, 'error') as mock_error:
-            with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook:
-                mock_hook.return_value.read_file.side_effect = AzureHttpError("failed to connect", 404)
-
-                handler.wasb_read(self.remote_log_location, return_error=True)
+            handler.wasb_read(self.remote_log_location, return_error=True)
             mock_error.assert_called_once_with(
                 'Could not read logs from remote/log/location/1.log',
                 exc_info=True,
@@ -164,9 +161,7 @@ class TestWasbTaskHandler:
         )
 
     def test_write_raises(self):
-        handler = WasbTaskHandler(
-            self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True
-        )
+        handler = self.wasb_task_handler
         with mock.patch.object(handler.log, 'error') as mock_error:
             with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook:
                 mock_hook.return_value.load_string.side_effect = AzureHttpError("failed to connect", 404)
diff --git a/tests/task/task_runner/test_task_runner.py b/tests/task/task_runner/test_task_runner.py
index ab140e05f5..fc5f3cc894 100644
--- a/tests/task/task_runner/test_task_runner.py
+++ b/tests/task/task_runner/test_task_runner.py
@@ -36,6 +36,7 @@ class GetTaskRunner(unittest.TestCase):
     def test_should_support_core_task_runner(self, mock_subprocess):
         ti = mock.MagicMock(map_index=-1, run_as_user=None)
         ti.get_template_context.return_value = {"ti": ti}
+        ti.get_dagrun.return_value.get_log_template.return_value.filename = "blah"
         local_task_job = mock.MagicMock(task_instance=ti)
         task_runner = get_task_runner(local_task_job)
 
diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py
index 4e6e942741..9a76ada725 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -29,6 +29,7 @@ import pytest
 from airflow import settings
 from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
 from airflow.models import DagRun
+from airflow.models.tasklog import LogTemplate
 from airflow.operators.python import PythonOperator
 from airflow.timetables.base import DataInterval
 from airflow.utils import timezone
@@ -44,6 +45,7 @@ class TestLogView:
     DAG_ID = "dag_log_reader"
     TASK_ID = "task_log_reader"
     DEFAULT_DATE = timezone.datetime(2017, 9, 1)
+    FILENAME_TEMPLATE = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(':', '.') }}/{{ try_number }}.log"
 
     @pytest.fixture(autouse=True)
     def log_dir(self):
@@ -70,9 +72,7 @@ class TestLogView:
     def configure_loggers(self, log_dir, settings_folder):
         logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
         logging_config["handlers"]["task"]["base_log_folder"] = log_dir
-        logging_config["handlers"]["task"][
-            "filename_template"
-        ] = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(':', '.') }}/{{ try_number }}.log"
+        logging_config["handlers"]["task"]["filename_template"] = self.FILENAME_TEMPLATE
         settings_file = os.path.join(settings_folder, "airflow_local_settings.py")
         with open(settings_file, "w") as handle:
             new_logging_file = f"LOGGING_CONFIG = {logging_config}"
@@ -93,6 +93,10 @@ class TestLogView:
 
     @pytest.fixture(autouse=True)
     def prepare_db(self, create_task_instance):
+        session = settings.Session()
+        log_template = LogTemplate(filename=self.FILENAME_TEMPLATE, elasticsearch_id="")
+        session.add(log_template)
+        session.commit()
         ti = create_task_instance(
             dag_id=self.DAG_ID,
             task_id=self.TASK_ID,
@@ -107,6 +111,8 @@ class TestLogView:
         yield
         clear_db_runs()
         clear_db_dags()
+        session.delete(log_template)
+        session.commit()
 
     def test_test_read_log_chunks_should_read_one_try(self):
         task_log_reader = TaskLogReader()
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index f4b4f7b2e3..28b9c7cf1a 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -21,8 +21,6 @@ import logging.config
 import os
 import re
 
-import pytest
-
 from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.operators.python import PythonOperator
@@ -218,34 +216,37 @@ class TestFileTaskLogHandler:
         os.remove(log_filename)
 
 
-@pytest.fixture()
-def filename_rendering_ti(session, create_task_instance):
-    return create_task_instance(
-        dag_id='dag_for_testing_filename_rendering',
-        task_id='task_for_testing_filename_rendering',
-        run_type=DagRunType.SCHEDULED,
-        execution_date=DEFAULT_DATE,
-        session=session,
-    )
-
-
 class TestFilenameRendering:
-    def test_python_formatting(self, filename_rendering_ti):
-        expected_filename = (
-            f'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/'
-            f'{DEFAULT_DATE.isoformat()}/42.log'
+    def test_python_formatting(self, create_log_template, create_task_instance):
+        create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
+        filename_rendering_ti = create_task_instance(
+            dag_id="dag_for_testing_filename_rendering",
+            task_id="task_for_testing_filename_rendering",
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
         )
 
-        fth = FileTaskHandler('', '{dag_id}/{task_id}/{execution_date}/{try_number}.log')
+        expected_filename = (
+            f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
+            f"{DEFAULT_DATE.isoformat()}/42.log"
+        )
+        fth = FileTaskHandler("")
         rendered_filename = fth._render_filename(filename_rendering_ti, 42)
         assert expected_filename == rendered_filename
 
-    def test_jinja_rendering(self, filename_rendering_ti):
-        expected_filename = (
-            f'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/'
-            f'{DEFAULT_DATE.isoformat()}/42.log'
+    def test_jinja_rendering(self, create_log_template, create_task_instance):
+        create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")
+        filename_rendering_ti = create_task_instance(
+            dag_id="dag_for_testing_filename_rendering",
+            task_id="task_for_testing_filename_rendering",
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
         )
 
-        fth = FileTaskHandler('', '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log')
+        expected_filename = (
+            f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
+            f"{DEFAULT_DATE.isoformat()}/42.log"
+        )
+        fth = FileTaskHandler("")
         rendered_filename = fth._render_filename(filename_rendering_ti, 42)
         assert expected_filename == rendered_filename
diff --git a/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log b/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log
new file mode 100644
index 0000000000..bc10ef7880
--- /dev/null
+++ b/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log
@@ -0,0 +1 @@
+Log for testing.
diff --git a/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log b/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log
new file mode 100644
index 0000000000..bc10ef7880
--- /dev/null
+++ b/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log
@@ -0,0 +1 @@
+Log for testing.
diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py
index f697cd3772..82b30f9d21 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -85,9 +85,6 @@ def log_app(backup_modules):
     logging_config['handlers']['task']['base_log_folder'] = str(
         pathlib.Path(__file__, "..", "..", "test_logs").resolve(),
     )
-    logging_config['handlers']['task'][
-        'filename_template'
-    ] = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log'
 
     with tempfile.TemporaryDirectory() as settings_dir:
         local_settings = pathlib.Path(settings_dir, "airflow_local_settings.py")