You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/30 14:03:29 UTC
[airflow] 02/14: Apply per-run log templates to log handlers (#24153)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit bea9e656bb3dda8ac7f1ad31b198b7e4e00a3b79
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Jun 7 08:13:55 2022 +0800
Apply per-run log templates to log handlers (#24153)
(cherry picked from commit c23826915dcdca4f22b52b74633336cb2f4a1eca)
---
airflow/config_templates/airflow_local_settings.py | 1 -
airflow/config_templates/default_test.cfg | 1 -
airflow/models/dagrun.py | 15 ++++-
.../alibaba/cloud/log/oss_task_handler.py | 2 +-
.../amazon/aws/log/cloudwatch_task_handler.py | 3 +-
.../providers/amazon/aws/log/s3_task_handler.py | 3 +-
.../providers/elasticsearch/log/es_task_handler.py | 33 +++++++++--
.../providers/google/cloud/log/gcs_task_handler.py | 2 +-
.../microsoft/azure/log/wasb_task_handler.py | 3 +-
airflow/utils/log/file_task_handler.py | 26 ++++++---
airflow/utils/log/log_reader.py | 2 +-
tests/api_connexion/endpoints/test_log_endpoint.py | 8 +--
tests/conftest.py | 39 +++++++++++++
.../alibaba/cloud/log/test_oss_task_handler.py | 5 +-
.../amazon/aws/log/test_cloudwatch_task_handler.py | 37 ++++++++----
.../amazon/aws/log/test_s3_task_handler.py | 43 +++++++++-----
.../elasticsearch/log/test_es_task_handler.py | 68 ++++++++++------------
.../google/cloud/log/test_gcs_task_handler.py | 8 +--
.../microsoft/azure/log/test_wasb_task_handler.py | 41 ++++++-------
tests/task/task_runner/test_task_runner.py | 1 +
tests/utils/log/test_log_reader.py | 12 +++-
tests/utils/test_log_handlers.py | 47 +++++++--------
.../task_for_testing_log_view/1.log | 1 +
.../attempt=1.log | 1 +
tests/www/views/test_views_log.py | 3 -
25 files changed, 250 insertions(+), 155 deletions(-)
diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index b2752c2be7..6684fd18e5 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -82,7 +82,6 @@ DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
- 'filename_template': FILENAME_TEMPLATE,
'filters': ['mask_secrets'],
},
'processor': {
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index 2f9b6fa264..83260d0d52 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -54,7 +54,6 @@ base_log_folder = {AIRFLOW_HOME}/logs
logging_level = INFO
celery_logging_level = WARN
fab_logging_level = WARN
-log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log
log_processor_filename_template = {{{{ filename }}}}.log
dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log
worker_log_server_port = 8793
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index d9c4eeb726..ad0dcdfebd 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1209,14 +1209,23 @@ class DagRun(Base, LoggingMixin):
return count
@provide_session
- def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str:
+ def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate:
if self.log_template_id is None: # DagRun created before LogTemplate introduction.
- template = session.query(LogTemplate.filename).order_by(LogTemplate.id).limit(1).scalar()
+ template = session.query(LogTemplate).order_by(LogTemplate.id).first()
else:
- template = session.query(LogTemplate.filename).filter_by(id=self.log_template_id).scalar()
+ template = session.query(LogTemplate).get(self.log_template_id)
if template is None:
raise AirflowException(
f"No log_template entry found for ID {self.log_template_id!r}. "
f"Please make sure you set up the metadatabase correctly."
)
return template
+
+ @provide_session
+ def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str:
+ warnings.warn(
+ "This method is deprecated. Please use get_log_template instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ return self.get_log_template(session=session).filename
diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index d26bfbfd04..ec61972ffc 100644
--- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -36,7 +36,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
uploads to and reads from OSS remote storage.
"""
- def __init__(self, base_log_folder, oss_log_folder, filename_template):
+ def __init__(self, base_log_folder, oss_log_folder, filename_template=None):
self.log.info("Using oss_task_handler for remote logging...")
super().__init__(base_log_folder, filename_template)
(self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder)
diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index c975a2cb83..7d4f81006b 100644
--- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -17,6 +17,7 @@
# under the License.
import sys
from datetime import datetime
+from typing import Optional
import watchtower
@@ -42,7 +43,7 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
:param filename_template: template for file name (local storage) or log stream name (remote)
"""
- def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: str):
+ def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: Optional[str] = None):
super().__init__(base_log_folder, filename_template)
split_arn = log_group_arn.split(':')
diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py
index 695c4623d9..0abea94c66 100644
--- a/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -17,6 +17,7 @@
# under the License.
import os
import sys
+from typing import Optional
if sys.version_info >= (3, 8):
from functools import cached_property
@@ -35,7 +36,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
uploads to and reads from S3 remote storage.
"""
- def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: str):
+ def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: Optional[str] = None):
super().__init__(base_log_folder, filename_template)
self.remote_base = s3_log_folder
self.log_relative_path = ''
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index 83c1163d80..64fce0df53 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -18,6 +18,7 @@
import logging
import sys
+import warnings
from collections import defaultdict
from datetime import datetime
from operator import attrgetter
@@ -31,15 +32,22 @@ import pendulum
from elasticsearch_dsl import Search
from airflow.configuration import conf
-from airflow.models import TaskInstance
+from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.json_formatter import JSONFormatter
from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
+from airflow.utils.session import create_session
# Elasticsearch hosted log type
EsLogMsgType = List[Tuple[str, str]]
+# Compatibility: Airflow 2.3.3 and up uses this method, which accesses the
+# LogTemplate model to record the log ID template used. If this function does
+# not exist, the task handler should use the log_id_template attribute instead.
+USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
+
class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
"""
@@ -65,8 +73,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
def __init__(
self,
base_log_folder: str,
- filename_template: str,
- log_id_template: str,
end_of_log_mark: str,
write_stdout: bool,
json_format: bool,
@@ -76,6 +82,9 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
host: str = "localhost:9200",
frontend: str = "localhost:5601",
es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"),
+ *,
+ filename_template: Optional[str] = None,
+ log_id_template: Optional[str] = None,
):
"""
:param base_log_folder: base folder to store logs locally
@@ -88,7 +97,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
self.client = elasticsearch.Elasticsearch([host], **es_kwargs) # type: ignore[attr-defined]
- self.log_id_template = log_id_template
+ if USE_PER_RUN_LOG_ID and log_id_template is not None:
+ warnings.warn(
+ "Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect",
+ DeprecationWarning,
+ )
+
+ self.log_id_template = log_id_template # Only used on Airflow < 2.3.2.
self.frontend = frontend
self.mark_end_on_close = True
self.end_of_log_mark = end_of_log_mark
@@ -103,7 +118,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
self.handler: Union[logging.FileHandler, logging.StreamHandler] # type: ignore[assignment]
def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
- dag_run = ti.get_dagrun()
+ with create_session() as session:
+ dag_run = ti.get_dagrun(session=session)
+ if USE_PER_RUN_LOG_ID:
+ log_id_template = dag_run.get_log_template(session=session).elasticsearch_id
+ else:
+ log_id_template = self.log_id_template
+
dag = ti.task.dag
assert dag is not None # For Mypy.
try:
@@ -126,7 +147,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
data_interval_end = ""
execution_date = dag_run.execution_date.isoformat()
- return self.log_id_template.format(
+ return log_id_template.format(
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=getattr(ti, "run_id", ""),
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py
index 92d133d109..81f1426d75 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -67,7 +67,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
*,
base_log_folder: str,
gcs_log_folder: str,
- filename_template: str,
+ filename_template: Optional[str] = None,
gcp_key_path: Optional[str] = None,
gcp_keyfile_dict: Optional[dict] = None,
gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS,
diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index 9ec0cdf646..f5e89c2c21 100644
--- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -44,8 +44,9 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
base_log_folder: str,
wasb_log_folder: str,
wasb_container: str,
- filename_template: str,
delete_local_copy: str,
+ *,
+ filename_template: Optional[str] = None,
) -> None:
super().__init__(base_log_folder, filename_template)
self.wasb_container = wasb_container
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 321e125288..2c53529a72 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -18,6 +18,7 @@
"""File logging handler for tasks."""
import logging
import os
+import warnings
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Tuple
@@ -27,6 +28,7 @@ from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.jwt_signer import JWTSigner
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
+from airflow.utils.session import create_session
if TYPE_CHECKING:
from airflow.models import TaskInstance
@@ -43,11 +45,15 @@ class FileTaskHandler(logging.Handler):
:param filename_template: template filename string
"""
- def __init__(self, base_log_folder: str, filename_template: str):
+ def __init__(self, base_log_folder: str, filename_template: Optional[str] = None):
super().__init__()
self.handler: Optional[logging.FileHandler] = None
self.local_base = base_log_folder
- self.filename_template, self.filename_jinja_template = parse_template_string(filename_template)
+ if filename_template is not None:
+ warnings.warn(
+ "Passing filename_template to FileTaskHandler is deprecated and has no effect",
+ DeprecationWarning,
+ )
def set_context(self, ti: "TaskInstance"):
"""
@@ -74,15 +80,19 @@ class FileTaskHandler(logging.Handler):
self.handler.close()
def _render_filename(self, ti: "TaskInstance", try_number: int) -> str:
- if self.filename_jinja_template:
+ with create_session() as session:
+ dag_run = ti.get_dagrun(session=session)
+ template = dag_run.get_log_template(session=session).filename
+ str_tpl, jinja_tpl = parse_template_string(template)
+
+ if jinja_tpl:
if hasattr(ti, "task"):
context = ti.get_template_context()
else:
- context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat())
+ context = Context(ti=ti, ts=dag_run.logical_date.isoformat())
context["try_number"] = try_number
- return render_template_to_string(self.filename_jinja_template, context)
- elif self.filename_template:
- dag_run = ti.get_dagrun()
+ return render_template_to_string(jinja_tpl, context)
+ elif str_tpl:
dag = ti.task.dag
assert dag is not None # For Mypy.
try:
@@ -97,7 +107,7 @@ class FileTaskHandler(logging.Handler):
data_interval_end = data_interval[1].isoformat()
else:
data_interval_end = ""
- return self.filename_template.format(
+ return str_tpl.format(
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=ti.run_id,
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index 396ab90a32..f241c22df1 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -121,6 +121,6 @@ class TaskLogReader:
attachment_filename = render_log_filename(
ti=ti,
try_number="all" if try_number is None else try_number,
- filename_template=dagrun.get_log_filename_template(session=session),
+ filename_template=dagrun.get_log_template(session=session).filename,
)
return attachment_filename
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py
index 614e1fa3a1..1b226be96f 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -99,7 +99,7 @@ class TestGetLog:
self.ti.hostname = 'localhost'
@pytest.fixture
- def configure_loggers(self, tmp_path):
+ def configure_loggers(self, tmp_path, create_log_template):
self.log_dir = tmp_path
dir_path = tmp_path / self.DAG_ID / self.TASK_ID / self.default_time.replace(':', '.')
@@ -112,9 +112,9 @@ class TestGetLog:
logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
logging_config['handlers']['task']['base_log_folder'] = self.log_dir
- logging_config['handlers']['task'][
- 'filename_template'
- ] = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+ create_log_template(
+ '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+ )
logging.config.dictConfig(logging_config)
diff --git a/tests/conftest.py b/tests/conftest.py
index 68d318e13c..b153c213d5 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -36,6 +36,9 @@ os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION") or "us-east-1"
os.environ["CREDENTIALS_DIR"] = os.environ.get('CREDENTIALS_DIR') or "/files/airflow-breeze-config/keys"
+from airflow import settings # noqa: E402
+from airflow.models.tasklog import LogTemplate # noqa: E402
+
from tests.test_utils.perf.perf_kit.sqlalchemy import ( # noqa isort:skip
count_queries,
trace_queries,
@@ -775,3 +778,39 @@ def session():
with create_session() as session:
yield session
session.rollback()
+
+
+@pytest.fixture()
+def get_test_dag():
+ def _get(dag_id):
+ from airflow.models.dagbag import DagBag
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ dag_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'dags', f'{dag_id}.py')
+ dagbag = DagBag(dag_folder=dag_file, include_examples=False)
+
+ dag = dagbag.get_dag(dag_id)
+ dag.sync_to_db()
+ SerializedDagModel.write_dag(dag)
+
+ return dag
+
+ return _get
+
+
+@pytest.fixture()
+def create_log_template(request):
+ session = settings.Session()
+
+ def _create_log_template(filename_template, elasticsearch_id=""):
+ log_template = LogTemplate(filename=filename_template, elasticsearch_id=elasticsearch_id)
+ session.add(log_template)
+ session.commit()
+
+ def _delete_log_template():
+ session.delete(log_template)
+ session.commit()
+
+ request.addfinalizer(_delete_log_template)
+
+ return _create_log_template
diff --git a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py
index 24eb73b92e..30e8cc32b9 100644
--- a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py
+++ b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py
@@ -35,10 +35,7 @@ class TestOSSTaskHandler(unittest.TestCase):
def setUp(self):
self.base_log_folder = 'local/airflow/logs/1.log'
self.oss_log_folder = f'oss://{MOCK_BUCKET_NAME}/airflow/logs'
- self.filename_template = '{try_number}.log'
- self.oss_task_handler = OSSTaskHandler(
- self.base_log_folder, self.oss_log_folder, self.filename_template
- )
+ self.oss_task_handler = OSSTaskHandler(self.base_log_folder, self.oss_log_folder)
@mock.patch(OSS_TASK_HANDLER_STRING.format('conf.get'))
@mock.patch(OSS_TASK_HANDLER_STRING.format('OSSHook'))
diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
index dbd2ae28d5..8b23218c8c 100644
--- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -16,17 +16,18 @@
# specific language governing permissions and limitations
# under the License.
import time
-import unittest
from datetime import datetime as dt
from unittest import mock
from unittest.mock import ANY, call
+import pytest
from watchtower import CloudWatchLogHandler
from airflow.models import DAG, DagRun, TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudwatchTaskHandler
+from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from tests.test_utils.config import conf_vars
@@ -44,19 +45,24 @@ def get_time_str(time_in_milliseconds):
return dt_time.strftime("%Y-%m-%d %H:%M:%S,000")
-@unittest.skipIf(mock_logs is None, "Skipping test because moto.mock_logs is not available")
-@mock_logs
-class TestCloudwatchTaskHandler(unittest.TestCase):
+@pytest.fixture(autouse=True, scope="module")
+def logmock():
+ with mock_logs():
+ yield
+
+
+@pytest.mark.skipif(mock_logs is None, reason="Skipping test because moto.mock_logs is not available")
+class TestCloudwatchTaskHandler:
@conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'})
- def setUp(self):
+ @pytest.fixture(autouse=True)
+ def setup(self, create_log_template):
self.remote_log_group = 'log_group_name'
self.region_name = 'us-west-2'
self.local_log_location = 'local/log/location'
- self.filename_template = '{dag_id}/{task_id}/{execution_date}/{try_number}.log'
+ create_log_template('{dag_id}/{task_id}/{execution_date}/{try_number}.log')
self.cloudwatch_task_handler = CloudwatchTaskHandler(
self.local_log_location,
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
- self.filename_template,
)
self.cloudwatch_task_handler.hook
@@ -65,21 +71,29 @@ class TestCloudwatchTaskHandler(unittest.TestCase):
task_id = 'task_for_testing_cloudwatch_log_handler'
self.dag = DAG(dag_id=dag_id, start_date=date)
task = EmptyOperator(task_id=task_id, dag=self.dag)
- dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test")
- self.ti = TaskInstance(task=task)
+ dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="scheduled")
+ with create_session() as session:
+ session.add(dag_run)
+ session.commit()
+ session.refresh(dag_run)
+
+ self.ti = TaskInstance(task=task, run_id=dag_run.run_id)
self.ti.dag_run = dag_run
self.ti.try_number = 1
self.ti.state = State.RUNNING
- self.remote_log_stream = f'{dag_id}/{task_id}/{date.isoformat()}/{self.ti.try_number}.log'.replace(
+ self.remote_log_stream = (f'{dag_id}/{task_id}/{date.isoformat()}/{self.ti.try_number}.log').replace(
':', '_'
)
moto.moto_api._internal.models.moto_api_backend.reset()
self.conn = boto3.client('logs', region_name=self.region_name)
- def tearDown(self):
+ yield
+
self.cloudwatch_task_handler.handler = None
+ with create_session() as session:
+ session.query(DagRun).delete()
def test_hook(self):
assert isinstance(self.cloudwatch_task_handler.hook, AwsLogsHook)
@@ -89,7 +103,6 @@ class TestCloudwatchTaskHandler(unittest.TestCase):
handler = CloudwatchTaskHandler(
self.local_log_location,
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
- self.filename_template,
)
with mock.patch.object(handler.log, 'error') as mock_error:
diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py b/tests/providers/amazon/aws/log/test_s3_task_handler.py
index a322f167ec..d5a5185f75 100644
--- a/tests/providers/amazon/aws/log/test_s3_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py
@@ -17,7 +17,6 @@
# under the License.
import os
-import unittest
from unittest import mock
from unittest.mock import ANY
@@ -28,6 +27,7 @@ from airflow.models import DAG, DagRun, TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler
+from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from tests.test_utils.config import conf_vars
@@ -40,32 +40,39 @@ except ImportError:
mock_s3 = None
-@unittest.skipIf(mock_s3 is None, "Skipping test because moto.mock_s3 is not available")
-@mock_s3
-class TestS3TaskHandler(unittest.TestCase):
+@pytest.fixture(autouse=True, scope="module")
+def s3mock():
+ with mock_s3():
+ yield
+
+
+@pytest.mark.skipif(mock_s3 is None, reason="Skipping test because moto.mock_s3 is not available")
+class TestS3TaskHandler:
@conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'})
- def setUp(self):
- super().setUp()
+ @pytest.fixture(autouse=True)
+ def setup(self, create_log_template):
self.remote_log_base = 's3://bucket/remote/log/location'
self.remote_log_location = 's3://bucket/remote/log/location/1.log'
self.remote_log_key = 'remote/log/location/1.log'
self.local_log_location = 'local/log/location'
- self.filename_template = '{try_number}.log'
- self.s3_task_handler = S3TaskHandler(
- self.local_log_location, self.remote_log_base, self.filename_template
- )
+ create_log_template('{try_number}.log')
+ self.s3_task_handler = S3TaskHandler(self.local_log_location, self.remote_log_base)
# Vivfy the hook now with the config override
assert self.s3_task_handler.hook is not None
date = datetime(2016, 1, 1)
self.dag = DAG('dag_for_testing_s3_task_handler', start_date=date)
task = EmptyOperator(task_id='task_for_testing_s3_log_handler', dag=self.dag)
- dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test")
- self.ti = TaskInstance(task=task)
+ dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="manual")
+ with create_session() as session:
+ session.add(dag_run)
+ session.commit()
+ session.refresh(dag_run)
+
+ self.ti = TaskInstance(task=task, run_id=dag_run.run_id)
self.ti.dag_run = dag_run
self.ti.try_number = 1
self.ti.state = State.RUNNING
- self.addCleanup(self.dag.clear)
self.conn = boto3.client('s3')
# We need to create the bucket since this is all in Moto's 'virtual'
@@ -73,7 +80,13 @@ class TestS3TaskHandler(unittest.TestCase):
moto.moto_api._internal.models.moto_api_backend.reset()
self.conn.create_bucket(Bucket="bucket")
- def tearDown(self):
+ yield
+
+ self.dag.clear()
+
+ with create_session() as session:
+ session.query(DagRun).delete()
+
if self.s3_task_handler.handler:
try:
os.remove(self.s3_task_handler.handler.baseFilename)
@@ -86,7 +99,7 @@ class TestS3TaskHandler(unittest.TestCase):
@conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'})
def test_hook_raises(self):
- handler = S3TaskHandler(self.local_log_location, self.remote_log_base, self.filename_template)
+ handler = S3TaskHandler(self.local_log_location, self.remote_log_base)
with mock.patch.object(handler.log, 'error') as mock_error:
with mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook") as mock_hook:
mock_hook.side_effect = Exception('Failed to connect')
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index 03eab3dbb7..e26a78fe77 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -58,9 +58,11 @@ class TestElasticsearchTaskHandler:
EXECUTION_DATE = datetime(2016, 1, 1)
LOG_ID = f'{DAG_ID}-{TASK_ID}-2016-01-01T00:00:00+00:00-1'
JSON_LOG_ID = f'{DAG_ID}-{TASK_ID}-{ElasticsearchTaskHandler._clean_date(EXECUTION_DATE)}-1'
+ FILENAME_TEMPLATE = '{try_number}.log'
@pytest.fixture()
- def ti(self, create_task_instance):
+ def ti(self, create_task_instance, create_log_template):
+ create_log_template(self.FILENAME_TEMPLATE, '{dag_id}-{task_id}-{execution_date}-{try_number}')
yield get_ti(
dag_id=self.DAG_ID,
task_id=self.TASK_ID,
@@ -73,8 +75,6 @@ class TestElasticsearchTaskHandler:
@elasticmock
def setup(self):
self.local_log_location = 'local/log/location'
- self.filename_template = '{try_number}.log'
- self.log_id_template = '{dag_id}-{task_id}-{execution_date}-{try_number}'
self.end_of_log_mark = 'end_of_log\n'
self.write_stdout = False
self.json_format = False
@@ -82,15 +82,13 @@ class TestElasticsearchTaskHandler:
self.host_field = 'host'
self.offset_field = 'offset'
self.es_task_handler = ElasticsearchTaskHandler(
- self.local_log_location,
- self.filename_template,
- self.log_id_template,
- self.end_of_log_mark,
- self.write_stdout,
- self.json_format,
- self.json_fields,
- self.host_field,
- self.offset_field,
+ base_log_folder=self.local_log_location,
+ end_of_log_mark=self.end_of_log_mark,
+ write_stdout=self.write_stdout,
+ json_format=self.json_format,
+ json_fields=self.json_fields,
+ host_field=self.host_field,
+ offset_field=self.offset_field,
)
self.es = elasticsearch.Elasticsearch(hosts=[{'host': 'localhost', 'port': 9200}])
@@ -115,15 +113,13 @@ class TestElasticsearchTaskHandler:
assert es_conf == expected_dict
# ensure creating with configs does not fail
ElasticsearchTaskHandler(
- self.local_log_location,
- self.filename_template,
- self.log_id_template,
- self.end_of_log_mark,
- self.write_stdout,
- self.json_format,
- self.json_fields,
- self.host_field,
- self.offset_field,
+ base_log_folder=self.local_log_location,
+ end_of_log_mark=self.end_of_log_mark,
+ write_stdout=self.write_stdout,
+ json_format=self.json_format,
+ json_fields=self.json_fields,
+ host_field=self.host_field,
+ offset_field=self.offset_field,
es_kwargs=es_conf,
)
@@ -395,7 +391,7 @@ class TestElasticsearchTaskHandler:
self.es_task_handler.set_context(ti)
self.es_task_handler.close()
with open(
- os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+ os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
) as log_file:
# end_of_log_mark may contain characters like '\n' which is needed to
# have the log uploaded but will not be stored in elasticsearch.
@@ -409,7 +405,7 @@ class TestElasticsearchTaskHandler:
self.es_task_handler.set_context(ti)
self.es_task_handler.close()
with open(
- os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+ os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
) as log_file:
assert self.end_of_log_mark not in log_file.read()
assert self.es_task_handler.closed
@@ -419,7 +415,7 @@ class TestElasticsearchTaskHandler:
self.es_task_handler.set_context(ti)
self.es_task_handler.close()
with open(
- os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+ os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
) as log_file:
assert 0 == len(log_file.read())
@@ -428,7 +424,7 @@ class TestElasticsearchTaskHandler:
self.es_task_handler.handler = None
self.es_task_handler.close()
with open(
- os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+ os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
) as log_file:
assert 0 == len(log_file.read())
assert self.es_task_handler.closed
@@ -438,7 +434,7 @@ class TestElasticsearchTaskHandler:
self.es_task_handler.handler.stream = None
self.es_task_handler.close()
with open(
- os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+ os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
) as log_file:
assert self.end_of_log_mark in log_file.read()
assert self.es_task_handler.closed
@@ -447,7 +443,7 @@ class TestElasticsearchTaskHandler:
self.es_task_handler.handler.stream.close()
self.es_task_handler.close()
with open(
- os.path.join(self.local_log_location, self.filename_template.format(try_number=1))
+ os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1))
) as log_file:
assert self.end_of_log_mark in log_file.read()
assert self.es_task_handler.closed
@@ -478,15 +474,13 @@ class TestElasticsearchTaskHandler:
)
def test_get_external_log_url(self, ti, json_format, es_frontend, expected_url):
es_task_handler = ElasticsearchTaskHandler(
- self.local_log_location,
- self.filename_template,
- self.log_id_template,
- self.end_of_log_mark,
- self.write_stdout,
- json_format,
- self.json_fields,
- self.host_field,
- self.offset_field,
+ base_log_folder=self.local_log_location,
+ end_of_log_mark=self.end_of_log_mark,
+ write_stdout=self.write_stdout,
+ json_format=json_format,
+ json_fields=self.json_fields,
+ host_field=self.host_field,
+ offset_field=self.offset_field,
frontend=es_frontend,
)
url = es_task_handler.get_external_log_url(ti, ti.try_number)
@@ -508,8 +502,6 @@ class TestElasticsearchTaskHandler:
# arrange
handler = ElasticsearchTaskHandler(
base_log_folder=self.local_log_location,
- filename_template=self.filename_template,
- log_id_template=self.log_id_template,
end_of_log_mark=self.end_of_log_mark,
write_stdout=True,
json_format=True,
diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py
index 6517be8f31..b443a9f8ec 100644
--- a/tests/providers/google/cloud/log/test_gcs_task_handler.py
+++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py
@@ -49,13 +49,11 @@ class TestGCSTaskHandler:
yield td
@pytest.fixture(autouse=True)
- def gcs_task_handler(self, local_log_location):
- self.remote_log_base = "gs://bucket/remote/log/location"
- self.filename_template = "{try_number}.log"
+ def gcs_task_handler(self, create_log_template, local_log_location):
+ create_log_template("{try_number}.log")
self.gcs_task_handler = GCSTaskHandler(
base_log_folder=local_log_location,
- gcs_log_folder=self.remote_log_base,
- filename_template=self.filename_template,
+ gcs_log_folder="gs://bucket/remote/log/location",
)
yield self.gcs_task_handler
diff --git a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
index 4fe9676717..3c92aa78aa 100644
--- a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
+++ b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
@@ -22,23 +22,25 @@ from azure.common import AzureHttpError
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbTaskHandler
-from airflow.utils.state import State
+from airflow.utils.state import TaskInstanceState
from airflow.utils.timezone import datetime
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_dags, clear_db_runs
+DEFAULT_DATE = datetime(2020, 8, 10)
+
class TestWasbTaskHandler:
@pytest.fixture(autouse=True)
- def ti(self, create_task_instance):
- date = datetime(2020, 8, 10)
+ def ti(self, create_task_instance, create_log_template):
+ create_log_template("{try_number}.log")
ti = create_task_instance(
dag_id='dag_for_testing_wasb_task_handler',
task_id='task_for_testing_wasb_log_handler',
- execution_date=date,
- start_date=date,
- dagrun_state=State.RUNNING,
- state=State.RUNNING,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ dagrun_state=TaskInstanceState.RUNNING,
+ state=TaskInstanceState.RUNNING,
)
ti.try_number = 1
ti.hostname = 'localhost'
@@ -52,12 +54,10 @@ class TestWasbTaskHandler:
self.remote_log_location = 'remote/log/location/1.log'
self.local_log_location = 'local/log/location'
self.container_name = "wasb-container"
- self.filename_template = '{try_number}.log'
self.wasb_task_handler = WasbTaskHandler(
base_log_folder=self.local_log_location,
wasb_log_folder=self.wasb_log_folder,
wasb_container=self.container_name,
- filename_template=self.filename_template,
delete_local_copy=True,
)
@@ -68,9 +68,7 @@ class TestWasbTaskHandler:
@conf_vars({('logging', 'remote_log_conn_id'): 'wasb_default'})
def test_hook_raises(self):
- handler = WasbTaskHandler(
- self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True
- )
+ handler = self.wasb_task_handler
with mock.patch.object(handler.log, 'error') as mock_error:
with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook:
mock_hook.side_effect = AzureHttpError("failed to connect", 404)
@@ -120,15 +118,14 @@ class TestWasbTaskHandler:
[{'end_of_log': True}],
)
- def test_wasb_read_raises(self):
- handler = WasbTaskHandler(
- self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True
- )
+ @mock.patch(
+ "airflow.providers.microsoft.azure.hooks.wasb.WasbHook",
+ **{"return_value.read_file.side_effect": AzureHttpError("failed to connect", 404)},
+ )
+ def test_wasb_read_raises(self, mock_hook):
+ handler = self.wasb_task_handler
with mock.patch.object(handler.log, 'error') as mock_error:
- with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook:
- mock_hook.return_value.read_file.side_effect = AzureHttpError("failed to connect", 404)
-
- handler.wasb_read(self.remote_log_location, return_error=True)
+ handler.wasb_read(self.remote_log_location, return_error=True)
mock_error.assert_called_once_with(
'Could not read logs from remote/log/location/1.log',
exc_info=True,
@@ -164,9 +161,7 @@ class TestWasbTaskHandler:
)
def test_write_raises(self):
- handler = WasbTaskHandler(
- self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True
- )
+ handler = self.wasb_task_handler
with mock.patch.object(handler.log, 'error') as mock_error:
with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook:
mock_hook.return_value.load_string.side_effect = AzureHttpError("failed to connect", 404)
diff --git a/tests/task/task_runner/test_task_runner.py b/tests/task/task_runner/test_task_runner.py
index ab140e05f5..fc5f3cc894 100644
--- a/tests/task/task_runner/test_task_runner.py
+++ b/tests/task/task_runner/test_task_runner.py
@@ -36,6 +36,7 @@ class GetTaskRunner(unittest.TestCase):
def test_should_support_core_task_runner(self, mock_subprocess):
ti = mock.MagicMock(map_index=-1, run_as_user=None)
ti.get_template_context.return_value = {"ti": ti}
+ ti.get_dagrun.return_value.get_log_template.return_value.filename = "blah"
local_task_job = mock.MagicMock(task_instance=ti)
task_runner = get_task_runner(local_task_job)
diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py
index 4e6e942741..9a76ada725 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -29,6 +29,7 @@ import pytest
from airflow import settings
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.models import DagRun
+from airflow.models.tasklog import LogTemplate
from airflow.operators.python import PythonOperator
from airflow.timetables.base import DataInterval
from airflow.utils import timezone
@@ -44,6 +45,7 @@ class TestLogView:
DAG_ID = "dag_log_reader"
TASK_ID = "task_log_reader"
DEFAULT_DATE = timezone.datetime(2017, 9, 1)
+ FILENAME_TEMPLATE = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(':', '.') }}/{{ try_number }}.log"
@pytest.fixture(autouse=True)
def log_dir(self):
@@ -70,9 +72,7 @@ class TestLogView:
def configure_loggers(self, log_dir, settings_folder):
logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
logging_config["handlers"]["task"]["base_log_folder"] = log_dir
- logging_config["handlers"]["task"][
- "filename_template"
- ] = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(':', '.') }}/{{ try_number }}.log"
+ logging_config["handlers"]["task"]["filename_template"] = self.FILENAME_TEMPLATE
settings_file = os.path.join(settings_folder, "airflow_local_settings.py")
with open(settings_file, "w") as handle:
new_logging_file = f"LOGGING_CONFIG = {logging_config}"
@@ -93,6 +93,10 @@ class TestLogView:
@pytest.fixture(autouse=True)
def prepare_db(self, create_task_instance):
+ session = settings.Session()
+ log_template = LogTemplate(filename=self.FILENAME_TEMPLATE, elasticsearch_id="")
+ session.add(log_template)
+ session.commit()
ti = create_task_instance(
dag_id=self.DAG_ID,
task_id=self.TASK_ID,
@@ -107,6 +111,8 @@ class TestLogView:
yield
clear_db_runs()
clear_db_dags()
+ session.delete(log_template)
+ session.commit()
def test_test_read_log_chunks_should_read_one_try(self):
task_log_reader = TaskLogReader()
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index f4b4f7b2e3..28b9c7cf1a 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -21,8 +21,6 @@ import logging.config
import os
import re
-import pytest
-
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.models import DAG, DagRun, TaskInstance
from airflow.operators.python import PythonOperator
@@ -218,34 +216,37 @@ class TestFileTaskLogHandler:
os.remove(log_filename)
-@pytest.fixture()
-def filename_rendering_ti(session, create_task_instance):
- return create_task_instance(
- dag_id='dag_for_testing_filename_rendering',
- task_id='task_for_testing_filename_rendering',
- run_type=DagRunType.SCHEDULED,
- execution_date=DEFAULT_DATE,
- session=session,
- )
-
-
class TestFilenameRendering:
- def test_python_formatting(self, filename_rendering_ti):
- expected_filename = (
- f'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/'
- f'{DEFAULT_DATE.isoformat()}/42.log'
+ def test_python_formatting(self, create_log_template, create_task_instance):
+ create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
+ filename_rendering_ti = create_task_instance(
+ dag_id="dag_for_testing_filename_rendering",
+ task_id="task_for_testing_filename_rendering",
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
)
- fth = FileTaskHandler('', '{dag_id}/{task_id}/{execution_date}/{try_number}.log')
+ expected_filename = (
+ f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
+ f"{DEFAULT_DATE.isoformat()}/42.log"
+ )
+ fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename
- def test_jinja_rendering(self, filename_rendering_ti):
- expected_filename = (
- f'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/'
- f'{DEFAULT_DATE.isoformat()}/42.log'
+ def test_jinja_rendering(self, create_log_template, create_task_instance):
+ create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log")
+ filename_rendering_ti = create_task_instance(
+ dag_id="dag_for_testing_filename_rendering",
+ task_id="task_for_testing_filename_rendering",
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
)
- fth = FileTaskHandler('', '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log')
+ expected_filename = (
+ f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/"
+ f"{DEFAULT_DATE.isoformat()}/42.log"
+ )
+ fth = FileTaskHandler("")
rendered_filename = fth._render_filename(filename_rendering_ti, 42)
assert expected_filename == rendered_filename
diff --git a/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log b/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log
new file mode 100644
index 0000000000..bc10ef7880
--- /dev/null
+++ b/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log
@@ -0,0 +1 @@
+Log for testing.
diff --git a/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log b/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log
new file mode 100644
index 0000000000..bc10ef7880
--- /dev/null
+++ b/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log
@@ -0,0 +1 @@
+Log for testing.
diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py
index 988d285936..fd136351cf 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -85,9 +85,6 @@ def log_app(backup_modules):
logging_config['handlers']['task']['base_log_folder'] = str(
pathlib.Path(__file__, "..", "..", "test_logs").resolve(),
)
- logging_config['handlers']['task'][
- 'filename_template'
- ] = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log'
with tempfile.TemporaryDirectory() as settings_dir:
local_settings = pathlib.Path(settings_dir, "airflow_local_settings.py")