You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2023/02/11 06:23:54 UTC

[airflow] branch main updated: Revert "Enable individual trigger logging (#27758)" (#29472)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 60d4bcd1d1 Revert "Enable individual trigger logging (#27758)" (#29472)
60d4bcd1d1 is described below

commit 60d4bcd1d101bb56955081d14e3e138a0c960c5f
Author: Niko Oliveira <on...@amazon.com>
AuthorDate: Fri Feb 10 22:23:46 2023 -0800

    Revert "Enable individual trigger logging (#27758)" (#29472)
    
    This reverts commit 1b18a501fe818079e535838fa4f232b03365fc75.
---
 airflow/api_connexion/endpoints/log_endpoint.py    |   8 +-
 airflow/cli/cli_parser.py                          |   1 -
 airflow/cli/commands/task_command.py               |  30 +-
 airflow/cli/commands/triggerer_command.py          |  29 +-
 airflow/config_templates/config.yml                |  18 -
 airflow/config_templates/default_airflow.cfg       |  11 -
 .../example_time_delta_sensor_async.py             |   2 +-
 airflow/executors/base_executor.py                 |   3 +-
 airflow/executors/celery_kubernetes_executor.py    |   6 +-
 airflow/executors/kubernetes_executor.py           |  19 +-
 airflow/executors/local_kubernetes_executor.py     |   7 +-
 airflow/jobs/base_job.py                           |   4 +-
 airflow/jobs/local_task_job.py                     |  22 +-
 airflow/jobs/triggerer_job.py                      | 282 +------------
 airflow/models/taskinstance.py                     |  46 +--
 airflow/models/trigger.py                          |  15 +-
 .../amazon/aws/log/cloudwatch_task_handler.py      |   3 -
 .../providers/amazon/aws/log/s3_task_handler.py    |  64 ++-
 .../providers/elasticsearch/log/es_task_handler.py |  10 +-
 .../providers/google/cloud/log/gcs_task_handler.py |  97 ++---
 .../google/cloud/log/stackdriver_task_handler.py   |  50 +--
 .../microsoft/azure/log/wasb_task_handler.py       |  78 +---
 airflow/task/task_runner/standard_task_runner.py   |   5 +-
 airflow/triggers/base.py                           |  13 +-
 airflow/triggers/temporal.py                       |  13 +-
 airflow/utils/log/file_task_handler.py             | 354 ++++++----------
 airflow/utils/log/trigger_handler.py               | 139 -------
 airflow/utils/serve_logs.py                        |   8 +-
 .../www/static/js/components/TabWithTooltip.tsx    |  47 ---
 .../js/dag/details/taskInstance/Logs/utils.ts      |   1 +
 .../templates/triggerer/triggerer-deployment.yaml  |  39 +-
 .../triggerer/triggerer-networkpolicy.yaml         |  58 ---
 chart/templates/triggerer/triggerer-service.yaml   |  49 ---
 chart/values.schema.json                           |  51 ---
 chart/values.yaml                                  |  19 +-
 .../logging-monitoring/logging-tasks.rst           |  48 +--
 tests/api_connexion/endpoints/test_log_endpoint.py |   8 +-
 tests/charts/test_basic_helm_chart.py              |  36 +-
 tests/charts/test_extra_env_env_from.py            |   3 +-
 tests/charts/test_rbac.py                          |  34 +-
 tests/charts/test_triggerer.py                     |  39 +-
 tests/executors/test_base_executor.py              |   2 +-
 tests/executors/test_celery_kubernetes_executor.py |   8 +-
 tests/executors/test_kubernetes_executor.py        |  19 +-
 tests/executors/test_local_kubernetes_executor.py  |  15 +-
 tests/jobs/test_triggerer_job.py                   |  56 +--
 tests/jobs/test_triggerer_job_logging.py           | 445 ---------------------
 .../amazon/aws/log/test_cloudwatch_task_handler.py |  17 +
 .../amazon/aws/log/test_s3_task_handler.py         |  27 +-
 .../google/cloud/log/test_gcs_task_handler.py      |  40 +-
 .../microsoft/azure/log/test_wasb_task_handler.py  |  25 +-
 tests/utils/log/test_log_reader.py                 |  85 ++--
 tests/utils/test_log_handlers.py                   | 339 ++--------------
 tests/www/views/test_views_tasks.py                |  23 +-
 54 files changed, 487 insertions(+), 2383 deletions(-)

diff --git a/airflow/api_connexion/endpoints/log_endpoint.py b/airflow/api_connexion/endpoints/log_endpoint.py
index 8e3555d409..388b164727 100644
--- a/airflow/api_connexion/endpoints/log_endpoint.py
+++ b/airflow/api_connexion/endpoints/log_endpoint.py
@@ -21,7 +21,6 @@ from typing import Any
 from flask import Response, request
 from itsdangerous.exc import BadSignature
 from itsdangerous.url_safe import URLSafeSerializer
-from sqlalchemy.orm import joinedload
 from sqlalchemy.orm.session import Session
 
 from airflow.api_connexion import security
@@ -74,10 +73,9 @@ def get_log(
         metadata["download_logs"] = False
 
     task_log_reader = TaskLogReader()
-
     if not task_log_reader.supports_read:
         raise BadRequest("Task log handler does not support read logs.")
-    query = (
+    ti = (
         session.query(TaskInstance)
         .filter(
             TaskInstance.task_id == task_id,
@@ -86,10 +84,8 @@ def get_log(
             TaskInstance.map_index == map_index,
         )
         .join(TaskInstance.dag_run)
-        .options(joinedload("trigger"))
-        .options(joinedload("trigger.triggerer_job"))
+        .one_or_none()
     )
-    ti = query.one_or_none()
     if ti is None:
         metadata["end_of_log"] = True
         raise NotFound(title="TaskInstance not found")
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index bf2e98d0f3..08abbcfe37 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -2115,7 +2115,6 @@ airflow_commands: list[CLICommand] = [
             ARG_LOG_FILE,
             ARG_CAPACITY,
             ARG_VERBOSE,
-            ARG_SKIP_SERVE_LOGS,
         ),
     ),
     ActionCommand(
diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 4ac228974c..9ab96e2faa 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -44,7 +44,6 @@ from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG
 from airflow.models.dagrun import DagRun
 from airflow.models.operator import needs_expansion
-from airflow.models.taskinstance import TaskReturnCode
 from airflow.settings import IS_K8S_EXECUTOR_POD
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
@@ -59,7 +58,6 @@ from airflow.utils.cli import (
     suppress_logs_and_warning,
 )
 from airflow.utils.dates import timezone
-from airflow.utils.log.file_task_handler import _set_task_deferred_context_var
 from airflow.utils.log.logging_mixin import StreamLogWriter
 from airflow.utils.log.secrets_masker import RedactedIO
 from airflow.utils.net import get_hostname
@@ -184,7 +182,7 @@ def _get_ti(
     return ti, dr_created
 
 
-def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None | TaskReturnCode:
+def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None:
     """
     Runs the task based on a mode.
 
@@ -195,11 +193,11 @@ def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None | Tas
     - by executor
     """
     if args.local:
-        return _run_task_by_local_task_job(args, ti)
+        _run_task_by_local_task_job(args, ti)
     elif args.raw:
-        return _run_raw_task(args, ti)
+        _run_raw_task(args, ti)
     else:
-        return _run_task_by_executor(args, dag, ti)
+        _run_task_by_executor(args, dag, ti)
 
 
 def _run_task_by_executor(args, dag, ti):
@@ -241,7 +239,7 @@ def _run_task_by_executor(args, dag, ti):
     executor.end()
 
 
-def _run_task_by_local_task_job(args, ti) -> TaskReturnCode | None:
+def _run_task_by_local_task_job(args, ti):
     """Run LocalTaskJob, which monitors the raw task execution process."""
     run_job = LocalTaskJob(
         task_instance=ti,
@@ -256,14 +254,11 @@ def _run_task_by_local_task_job(args, ti) -> TaskReturnCode | None:
         external_executor_id=_extract_external_executor_id(args),
     )
     try:
-        ret = run_job.run()
+        run_job.run()
 
     finally:
         if args.shut_down_logging:
             logging.shutdown()
-    with suppress(ValueError):
-        return TaskReturnCode(ret)
-    return None
 
 
 RAW_TASK_UNSUPPORTED_OPTION = [
@@ -274,9 +269,9 @@ RAW_TASK_UNSUPPORTED_OPTION = [
 ]
 
 
-def _run_raw_task(args, ti: TaskInstance) -> None | TaskReturnCode:
+def _run_raw_task(args, ti: TaskInstance) -> None:
     """Runs the main task handling code."""
-    return ti._run_raw_task(
+    ti._run_raw_task(
         mark_success=args.mark_success,
         job_id=args.job_id,
         pool=args.pool,
@@ -412,21 +407,18 @@ def task_run(args, dag=None):
     # this should be last thing before running, to reduce likelihood of an open session
     # which can cause trouble if running process in a fork.
     settings.reconfigure_orm(disable_connection_pool=True)
-    task_return_code = None
+
     try:
         if args.interactive:
-            task_return_code = _run_task_by_selected_method(args, dag, ti)
+            _run_task_by_selected_method(args, dag, ti)
         else:
             with _move_task_handlers_to_root(ti), _redirect_stdout_to_ti_log(ti):
-                task_return_code = _run_task_by_selected_method(args, dag, ti)
-                if task_return_code == TaskReturnCode.DEFERRED:
-                    _set_task_deferred_context_var()
+                _run_task_by_selected_method(args, dag, ti)
     finally:
         try:
             get_listener_manager().hook.before_stopping(component=TaskCommandMarker())
         except Exception:
             pass
-    return task_return_code
 
 
 @cli_utils.action_cli(check_db=False)
diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py
index 8bf0c2822d..64755f3830 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -18,35 +18,14 @@
 from __future__ import annotations
 
 import signal
-from contextlib import contextmanager
-from functools import partial
-from multiprocessing import Process
-from typing import Generator
 
 import daemon
 from daemon.pidfile import TimeoutPIDLockFile
 
 from airflow import settings
-from airflow.configuration import conf
 from airflow.jobs.triggerer_job import TriggererJob
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler
-from airflow.utils.serve_logs import serve_logs
-
-
-@contextmanager
-def _serve_logs(skip_serve_logs: bool = False) -> Generator[None, None, None]:
-    """Starts serve_logs sub-process"""
-    sub_proc = None
-    if skip_serve_logs is False:
-        port = conf.getint("logging", "trigger_log_server_port", fallback=8794)
-        sub_proc = Process(target=partial(serve_logs, port=port))
-        sub_proc.start()
-    try:
-        yield
-    finally:
-        if sub_proc:
-            sub_proc.terminate()
 
 
 @cli_utils.action_cli
@@ -65,18 +44,18 @@ def triggerer(args):
             stdout_handle.truncate(0)
             stderr_handle.truncate(0)
 
-            daemon_context = daemon.DaemonContext(
+            ctx = daemon.DaemonContext(
                 pidfile=TimeoutPIDLockFile(pid, -1),
                 files_preserve=[handle],
                 stdout=stdout_handle,
                 stderr=stderr_handle,
                 umask=int(settings.DAEMON_UMASK, 8),
             )
-            with daemon_context, _serve_logs(args.skip_serve_logs):
+            with ctx:
                 job.run()
+
     else:
         signal.signal(signal.SIGINT, sigint_handler)
         signal.signal(signal.SIGTERM, sigint_handler)
         signal.signal(signal.SIGQUIT, sigquit_handler)
-        with _serve_logs(args.skip_serve_logs):
-            job.run()
+        job.run()
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 64067a4bcb..00aeb23b1d 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -788,24 +788,6 @@ logging:
       type: string
       example: ~
       default: "8793"
-    trigger_log_server_port:
-      description: |
-        Port to serve logs from for triggerer.  See worker_log_server_port description
-        for more info.
-      version_added: 2.6.0
-      type: string
-      example: ~
-      default: "8794"
-    interleave_timestamp_parser:
-      description: |
-        We must parse timestamps to interleave logs between trigger and task.  To do so,
-        we need to parse timestamps in log files. In case your log format is non-standard,
-        you may provide import path to callable which takes a string log line and returns
-        the timestamp (datetime.datetime compatible).
-      version_added: 2.6.0
-      type: string
-      example: path.to.my_func
-      default: ~
 metrics:
   description: |
     StatsD (https://github.com/etsy/statsd) integration settings.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 7a24fcb480..471b6c060e 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -434,17 +434,6 @@ extra_logger_names =
 # visible from the main web server to connect into the workers.
 worker_log_server_port = 8793
 
-# Port to serve logs from for triggerer.  See worker_log_server_port description
-# for more info.
-trigger_log_server_port = 8794
-
-# We must parse timestamps to interleave logs between trigger and task.  To do so,
-# we need to parse timestamps in log files. In case your log format is non-standard,
-# you may provide import path to callable which takes a string log line and returns
-# the timestamp (datetime.datetime compatible).
-# Example: interleave_timestamp_parser = path.to.my_func
-# interleave_timestamp_parser =
-
 [metrics]
 
 # StatsD (https://github.com/etsy/statsd) integration settings.
diff --git a/airflow/example_dags/example_time_delta_sensor_async.py b/airflow/example_dags/example_time_delta_sensor_async.py
index 8361009ff7..d1562c5751 100644
--- a/airflow/example_dags/example_time_delta_sensor_async.py
+++ b/airflow/example_dags/example_time_delta_sensor_async.py
@@ -36,6 +36,6 @@ with DAG(
     catchup=False,
     tags=["example"],
 ) as dag:
-    wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=30))
+    wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=10))
     finish = EmptyOperator(task_id="finish")
     wait >> finish
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index fbb64d078e..a8cc35875b 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -356,7 +356,7 @@ class BaseExecutor(LoggingMixin):
         """
         raise NotImplementedError()
 
-    def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
         """
         This method can be implemented by any child class to return the task logs.
 
@@ -364,7 +364,6 @@ class BaseExecutor(LoggingMixin):
         :param log: log str
         :return: logs or tuple of logs and meta dict
         """
-        return [], []
 
     def end(self) -> None:  # pragma: no cover
         """Wait synchronously for the previously submitted job to complete."""
diff --git a/airflow/executors/celery_kubernetes_executor.py b/airflow/executors/celery_kubernetes_executor.py
index 667810acd4..6fb4a42c96 100644
--- a/airflow/executors/celery_kubernetes_executor.py
+++ b/airflow/executors/celery_kubernetes_executor.py
@@ -140,11 +140,11 @@ class CeleryKubernetesExecutor(LoggingMixin):
             cfg_path=cfg_path,
         )
 
-    def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
         """Fetch task log from Kubernetes executor"""
         if ti.queue == self.kubernetes_executor.kubernetes_queue:
-            return self.kubernetes_executor.get_task_log(ti=ti)
-        return [], []
+            return self.kubernetes_executor.get_task_log(ti=ti, log=log)
+        return None
 
     def has_task(self, task_instance: TaskInstance) -> bool:
         """
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index b388a3b386..de48d66ffb 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -781,16 +781,14 @@ class KubernetesExecutor(BaseExecutor):
             namespace = pod_override.metadata.namespace
         return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
 
-    def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
-        messages = []
-        log = []
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]:
+
         try:
-            from airflow.kubernetes.kube_client import get_kube_client
             from airflow.kubernetes.pod_generator import PodGenerator
 
             client = get_kube_client()
 
-            messages.append(f"Trying to get logs (last 100 lines) from worker pod {ti.hostname}")
+            log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
             selector = PodGenerator.build_selector_for_k8s_executor_pod(
                 dag_id=ti.dag_id,
                 task_id=ti.task_id,
@@ -818,10 +816,13 @@ class KubernetesExecutor(BaseExecutor):
             )
 
             for line in res:
-                log.append(line.decode())
-        except Exception as e:
-            messages.append(f"Reading from k8s pod logs failed: {str(e)}")
-        return messages, ["\n".join(log)]
+                log += line.decode()
+
+            return log
+
+        except Exception as f:
+            log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
+            return log, {"end_of_log": True}
 
     def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
         tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
diff --git a/airflow/executors/local_kubernetes_executor.py b/airflow/executors/local_kubernetes_executor.py
index 2b19b40a87..c29071b571 100644
--- a/airflow/executors/local_kubernetes_executor.py
+++ b/airflow/executors/local_kubernetes_executor.py
@@ -141,11 +141,12 @@ class LocalKubernetesExecutor(LoggingMixin):
             cfg_path=cfg_path,
         )
 
-    def get_task_log(self, ti: TaskInstance) -> tuple[list[str], list[str]]:
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:
         """Fetch task log from kubernetes executor"""
         if ti.queue == self.kubernetes_executor.kubernetes_queue:
-            return self.kubernetes_executor.get_task_log(ti=ti)
-        return [], []
+            return self.kubernetes_executor.get_task_log(ti=ti, log=log)
+
+        return None
 
     def has_task(self, task_instance: TaskInstance) -> bool:
         """
diff --git a/airflow/jobs/base_job.py b/airflow/jobs/base_job.py
index 919e1a7fbc..580b5a36ed 100644
--- a/airflow/jobs/base_job.py
+++ b/airflow/jobs/base_job.py
@@ -249,7 +249,6 @@ class BaseJob(Base, LoggingMixin):
         """Starts the job."""
         Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
         # Adding an entry in the DB
-        ret = None
         with create_session() as session:
             self.state = State.RUNNING
             session.add(self)
@@ -257,7 +256,7 @@ class BaseJob(Base, LoggingMixin):
             make_transient(self)
 
             try:
-                ret = self._execute()
+                self._execute()
                 # In case of max runs or max duration
                 self.state = State.SUCCESS
             except SystemExit:
@@ -273,7 +272,6 @@ class BaseJob(Base, LoggingMixin):
                 session.commit()
 
         Stats.incr(self.__class__.__name__.lower() + "_end", 1, 1)
-        return ret
 
     def _execute(self):
         raise NotImplementedError("This method needs to be overridden")
diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index ed5ec5ffea..32b108993f 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -24,11 +24,10 @@ import psutil
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.jobs.base_job import BaseJob
-from airflow.models.taskinstance import TaskInstance, TaskReturnCode
+from airflow.models.taskinstance import TaskInstance
 from airflow.stats import Stats
 from airflow.task.task_runner import get_task_runner
 from airflow.utils import timezone
-from airflow.utils.log.file_task_handler import _set_task_deferred_context_var
 from airflow.utils.net import get_hostname
 from airflow.utils.platform import IS_WINDOWS
 from airflow.utils.session import provide_session
@@ -105,7 +104,7 @@ class LocalTaskJob(BaseJob):
 
         super().__init__(*args, **kwargs)
 
-    def _execute(self) -> int | None:
+    def _execute(self):
         self.task_runner = get_task_runner(self)
 
         def signal_handler(signum, frame):
@@ -150,9 +149,8 @@ class LocalTaskJob(BaseJob):
             external_executor_id=self.external_executor_id,
         ):
             self.log.info("Task is not able to be run")
-            return None
+            return
 
-        return_code = None
         try:
             self.task_runner.start()
 
@@ -185,7 +183,7 @@ class LocalTaskJob(BaseJob):
                 return_code = self.task_runner.return_code(timeout=max_wait_time)
                 if return_code is not None:
                     self.handle_task_exit(return_code)
-                    return return_code
+                    return
 
                 self.heartbeat()
 
@@ -200,7 +198,6 @@ class LocalTaskJob(BaseJob):
                         f"Time since last heartbeat({time_since_last_heartbeat:.2f}s) exceeded limit "
                         f"({heartbeat_time_limit}s)."
                     )
-            return return_code
         finally:
             self.on_kill()
 
@@ -212,15 +209,10 @@ class LocalTaskJob(BaseJob):
         """
         # Without setting this, heartbeat may get us
         self.terminating = True
+        self.log.info("Task exited with return code %s", return_code)
         self._log_return_code_metric(return_code)
-        is_deferral = return_code == TaskReturnCode.DEFERRED.value
-        if is_deferral:
-            self.log.info("Task exited with return code %s (task deferral)", return_code)
-            _set_task_deferred_context_var()
-        else:
-            self.log.info("Task exited with return code %s", return_code)
-
-        if not self.task_instance.test_mode and not is_deferral:
+
+        if not self.task_instance.test_mode:
             if conf.getboolean("scheduler", "schedule_after_task_execution", fallback=True):
                 self.task_instance.schedule_downstream_tasks()
 
diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index d62aaa3ff5..1c62a476bf 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -17,17 +17,13 @@
 from __future__ import annotations
 
 import asyncio
-import logging
 import os
 import signal
 import sys
 import threading
 import time
-import warnings
 from collections import deque
-from copy import copy
-from queue import SimpleQueue
-from typing import TYPE_CHECKING, Deque
+from typing import Deque
 
 from sqlalchemy import func
 
@@ -37,199 +33,10 @@ from airflow.models.trigger import Trigger
 from airflow.stats import Stats
 from airflow.triggers.base import BaseTrigger, TriggerEvent
 from airflow.typing_compat import TypedDict
-from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.log.trigger_handler import (
-    DropTriggerLogsFilter,
-    LocalQueueHandler,
-    TriggererHandlerWrapper,
-    TriggerMetadataFilter,
-    ctx_indiv_trigger,
-    ctx_task_instance,
-    ctx_trigger_end,
-    ctx_trigger_id,
-)
 from airflow.utils.module_loading import import_string
 from airflow.utils.session import provide_session
 
-if TYPE_CHECKING:
-    from airflow.models import TaskInstance
-
-HANDLER_SUPPORTS_TRIGGERER = False
-"""
-If this value is true, root handler is configured to log individual trigger messages
-visible in task logs.
-
-:meta private:
-"""
-
-SEND_TRIGGER_END_MARKER = True
-"""
-If handler natively supports triggers, may want to disable sending trigger end marker.
-
-:meta private:
-"""
-
-logger = logging.getLogger(__name__)
-
-
-DISABLE_WRAPPER = conf.getboolean("logging", "disable_trigger_handler_wrapper", fallback=False)
-DISABLE_LISTENER = conf.getboolean("logging", "disable_trigger_handler_queue_listener", fallback=False)
-
-
-def configure_trigger_log_handler():
-    """
-    Configure logging such that each trigger logs to its own file and
-    can be exposed through the airflow webserver.
-
-    Generally speaking, we take the log handler configured for logger ``airflow.task``,
-    wrap it with TriggerHandlerWrapper, and set it as the handler for root logger.
-
-    If there already is a handler configured for the root logger
-    and it supports triggers, we wrap it instead.
-
-    :meta private:
-    """
-    global HANDLER_SUPPORTS_TRIGGERER
-
-    def should_wrap(handler):
-        return handler.__dict__.get("trigger_should_wrap", False) or handler.__class__.__dict__.get(
-            "trigger_should_wrap", False
-        )
-
-    def should_queue(handler):
-        return handler.__dict__.get("trigger_should_queue", True) or handler.__class__.__dict__.get(
-            "trigger_should_queue", True
-        )
-
-    def send_trigger_end_marker(handler):
-        val = handler.__dict__.get("trigger_send_end_marker", None)
-        if val is not None:
-            return val
-
-        val = handler.__class__.__dict__.get("trigger_send_end_marker", None)
-        if val is not None:
-            return val
-        return True
-
-    def supports_triggerer(handler):
-        return (
-            should_wrap(handler)
-            or handler.__dict__.get("trigger_supported", False)
-            or handler.__class__.__dict__.get("trigger_supported", False)
-        )
-
-    def get_task_handler_from_logger(logger_):
-        for h in logger_.handlers:
-            if isinstance(h, FileTaskHandler) and not supports_triggerer(h):
-                warnings.warn(
-                    f"Handler {h.__class__.__name__} does not support "
-                    "individual trigger logging. Please check the release notes "
-                    "for your provider to see if a newer version supports "
-                    "individual trigger logging."
-                )
-            if supports_triggerer(h):
-                return h
-
-    def find_suitable_task_handler():
-        # check root logger then check airflow.task to see if a handler
-        # suitable for use with TriggerHandlerWrapper (has trigger_should_wrap
-        # attr, likely inherits from FileTaskHandler)
-        h = get_task_handler_from_logger(root_logger)
-        if not h:
-            # try to use handler configured from airflow task
-            logger.debug("No task logger configured for root logger; trying `airflow.task`.")
-            h = get_task_handler_from_logger(logging.getLogger("airflow.task"))
-            if h:
-                logger.debug("Using logging configuration from `airflow.task`")
-        if not h:
-            warnings.warn("Could not find log handler suitable for individual trigger logging.")
-            return None
-        return h
-
-    def filter_trigger_logs_from_other_root_handlers(new_hdlr):
-        # we add context vars to log records emitted for individual triggerer logging
-        # we want these records to be processed by our special trigger handler wrapper
-        # but not by any other handlers, so we filter out these messages from
-        # other handlers by adding DropTriggerLogsFilter
-        # we could consider only adding this filter to the default console logger
-        # so as to leave other custom handlers alone
-        for h in root_logger.handlers:
-            if h is not new_hdlr:
-                h.addFilter(DropTriggerLogsFilter())
-
-    def add_handler_wrapper_to_root(base_handler):
-        # first make sure we remove from root logger if it happens to be there
-        # it could have come from root or airflow.task, but we only need
-        # to make sure we remove from root, since messages will not flow
-        # through airflow.task
-        if base_handler in root_logger.handlers:
-            root_logger.removeHandler(base_handler)
-
-        logger.info("Setting up TriggererHandlerWrapper with handler %s", base_handler)
-        h = TriggererHandlerWrapper(base_handler=base_handler, level=base_handler.level)
-        # just extra cautious, checking if user manually configured it there
-        if h not in root_logger.handlers:
-            root_logger.addHandler(h)
-        return h
-
-    root_logger = logging.getLogger()
-    task_handler = find_suitable_task_handler()
-    if not task_handler:
-        return None
-    if TYPE_CHECKING:
-        assert isinstance(task_handler, FileTaskHandler)
-    if should_wrap(task_handler):
-        trigger_handler = add_handler_wrapper_to_root(task_handler)
-    else:
-        trigger_handler = copy(task_handler)
-        root_logger.addHandler(trigger_handler)
-    filter_trigger_logs_from_other_root_handlers(trigger_handler)
-    if send_trigger_end_marker(trigger_handler) is False:
-        global SEND_TRIGGER_END_MARKER
-        SEND_TRIGGER_END_MARKER = False
-    HANDLER_SUPPORTS_TRIGGERER = True
-    return should_queue(trigger_handler)
-
-
-def setup_queue_listener():
-    """
-    Route log messages to a queue and process them with QueueListener.
-
-    Airflow task handlers make blocking I/O calls.
-    We replace trigger log handlers, with LocalQueueHandler,
-    which sends log records to a queue.
-    Then we start a QueueListener in a thread, which is configured
-    to consume the queue and pass the records to the handlers as
-    originally configured. This keeps the handler I/O out of the
-    async event loop.
-
-    :meta private:
-    """
-    queue = SimpleQueue()
-    root_logger = logging.getLogger()
-
-    handlers: list[logging.Handler] = []
-
-    queue_handler = LocalQueueHandler(queue)
-    queue_handler.addFilter(TriggerMetadataFilter())
-
-    root_logger.addHandler(queue_handler)
-    for h in root_logger.handlers[:]:
-        if h is not queue_handler and "pytest" not in h.__module__:
-            root_logger.removeHandler(h)
-            handlers.append(h)
-
-    this_logger = logging.getLogger(__name__)
-    if handlers:
-        this_logger.info("Setting up logging queue listener with handlers %s", handlers)
-        listener = logging.handlers.QueueListener(queue, *handlers, respect_handler_level=True)
-        listener.start()
-        return listener
-    else:
-        this_logger.warning("Unable to set up individual trigger logging")
-        return None
-
 
 class TriggererJob(BaseJob):
     """
@@ -255,24 +62,6 @@ class TriggererJob(BaseJob):
         else:
             raise ValueError(f"Capacity number {capacity} is invalid")
 
-        should_queue = True
-        if DISABLE_WRAPPER:
-            self.log.warning(
-                "Skipping trigger log configuration; disabled by param "
-                "`disable_trigger_handler_wrapper=True`."
-            )
-        else:
-            should_queue = configure_trigger_log_handler()
-        self.listener = None
-        if DISABLE_LISTENER:
-            self.log.warning(
-                "Skipping trigger logger queue listener; disabled by param "
-                "`disable_trigger_handler_queue_listener=True`."
-            )
-        elif should_queue is False:
-            self.log.warning("Skipping trigger logger queue listener; disabled by handler setting.")
-        else:
-            self.listener = setup_queue_listener()
         # Set up runner async thread
         self.runner = TriggerRunner()
 
@@ -298,19 +87,12 @@ class TriggererJob(BaseJob):
         """
         self.runner.stop = True
 
-    def _kill_listener(self):
-        if self.listener:
-            for h in self.listener.handlers:
-                h.close()
-            self.listener.stop()
-
     def _exit_gracefully(self, signum, frame) -> None:
         """Helper method to clean up processor_agent to avoid leaving orphan processes."""
         # The first time, try to exit nicely
         if not self.runner.stop:
             self.log.info("Exiting gracefully upon receiving signal %s", signum)
             self.runner.stop = True
-            self._kill_listener()
         else:
             self.log.warning("Forcing exit due to second exit signal %s", signum)
             sys.exit(os.EX_SOFTWARE)
@@ -318,9 +100,6 @@ class TriggererJob(BaseJob):
     def _execute(self) -> None:
         self.log.info("Starting the triggerer")
         try:
-            # set job_id so that it can be used in log file names
-            self.runner.job_id = self.id
-
             # Kick off runner thread
             self.runner.start()
             # Start our own DB loop in the main thread
@@ -445,7 +224,6 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         self.to_cancel = deque()
         self.events = deque()
         self.failed_triggers = deque()
-        self.job_id = None
 
     def run(self):
         """Sync entrypoint - just runs arun in an async loop."""
@@ -467,10 +245,11 @@ class TriggerRunner(threading.Thread, LoggingMixin):
             await self.cleanup_finished_triggers()
             # Sleep for a bit
             await asyncio.sleep(1)
-            # Every minute, log status
+            # Every minute, log status if at least one trigger is running.
             if time.time() - last_status >= 60:
                 count = len(self.triggers)
-                self.log.info("%i triggers currently running", count)
+                if count > 0:
+                    self.log.info("%i triggers currently running", count)
                 last_status = time.time()
         # Wait for watchdog to complete
         await watchdog
@@ -570,29 +349,15 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 )
                 Stats.incr("triggers.blocked_main_thread")
 
-    @staticmethod
-    def set_individual_trigger_logging(trigger):
-        """
-        Setting these context vars allows log messages for individual triggers
-        to be routed to distinct files and filtered from triggerer stdout.
-        """
-        # set logging context vars for routing to appropriate handler
-        ctx_task_instance.set(trigger.task_instance)
-        ctx_trigger_id.set(trigger.trigger_id)
-        ctx_trigger_end.set(False)
-
-        # mark that we're in the context of an individual trigger so log records can be filtered
-        ctx_indiv_trigger.set(True)
+    # Async trigger logic
 
     async def run_trigger(self, trigger_id, trigger):
         """
         Wrapper which runs an actual trigger (they are async generators)
         and pushes their events into our outbound event deque.
         """
-        name = self.triggers[trigger_id]["name"]
-        self.log.info("trigger %s starting", name)
+        self.log.info("Trigger %s starting", self.triggers[trigger_id]["name"])
         try:
-            self.set_individual_trigger_logging(trigger)
             async for event in trigger.run():
                 self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event)
                 self.triggers[trigger_id]["events"] += 1
@@ -603,19 +368,8 @@ class TriggerRunner(threading.Thread, LoggingMixin):
             # allow triggers a chance to cleanup, either in that case or if
             # they exit cleanly.
             trigger.cleanup()
-            if SEND_TRIGGER_END_MARKER:
-                self.mark_trigger_end(trigger)
-
-            # unsetting ctx_indiv_trigger var restores stdout logging
-            ctx_indiv_trigger.set(None)
-            self.log.info("trigger %s completed", name)
 
-    @staticmethod
-    def mark_trigger_end(trigger):
-        if not HANDLER_SUPPORTS_TRIGGERER:
-            return
-        ctx_trigger_end.set(True)
-        trigger.log.log(level=100, msg="trigger end")
+    # Main-thread sync API
 
     def update_triggers(self, requested_trigger_ids: set[int]):
         """
@@ -650,34 +404,16 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                 continue
             # Resolve trigger record into an actual class instance
             try:
-                new_trigger_orm = new_triggers[new_id]
-                trigger_class = self.get_trigger_by_classpath(new_trigger_orm.classpath)
+                trigger_class = self.get_trigger_by_classpath(new_triggers[new_id].classpath)
             except BaseException as e:
                 # Either the trigger code or the path to it is bad. Fail the trigger.
                 self.failed_triggers.append((new_id, e))
                 continue
-            new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
-            self.set_trigger_logging_metadata(new_trigger_orm.task_instance, new_id, new_trigger_instance)
-            self.to_create.append((new_id, new_trigger_instance))
+            self.to_create.append((new_id, trigger_class(**new_triggers[new_id].kwargs)))
         # Enqueue orphaned triggers for cancellation
         for old_id in cancel_trigger_ids:
             self.to_cancel.append(old_id)
 
-    def set_trigger_logging_metadata(self, ti: TaskInstance, trigger_id, trigger):
-        """
-        Set up logging for triggers
-
-        We want to ensure that each trigger logs to its own file and that the log messages are not
-        propagated to parent loggers.
-
-        :meta private:
-        """
-        if ti:  # can be None in tests
-            ti.is_trigger_log_context = True
-        trigger.task_instance = ti
-        trigger.triggerer_job_id = self.job_id
-        trigger.trigger_id = trigger_id
-
     def get_trigger_by_classpath(self, classpath: str) -> type[BaseTrigger]:
         """
         Gets a trigger class by its classpath ("path.to.module.classname").
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index e351984678..d0c81641ef 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -28,9 +28,7 @@ import signal
 import warnings
 from collections import defaultdict
 from datetime import datetime, timedelta
-from enum import Enum
 from functools import partial
-from pathlib import PurePath
 from types import TracebackType
 from typing import TYPE_CHECKING, Any, Callable, Collection, Generator, Iterable, NamedTuple, Tuple
 from urllib.parse import quote
@@ -142,17 +140,6 @@ if TYPE_CHECKING:
 PAST_DEPENDS_MET = "past_depends_met"
 
 
-class TaskReturnCode(Enum):
-    """
-    Enum to signal manner of exit for task run command.
-
-    :meta private:
-    """
-
-    DEFERRED = 100
-    """When task exits with deferral to trigger."""
-
-
 @contextlib.contextmanager
 def set_current_context(context: Context) -> Generator[Context, None, None]:
     """
@@ -453,7 +440,7 @@ class TaskInstance(Base, LoggingMixin):
         viewonly=True,
     )
 
-    trigger = relationship("Trigger", uselist=False, back_populates="task_instance")
+    trigger = relationship("Trigger", uselist=False)
     triggerer_job = association_proxy("trigger", "triggerer_job")
     dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)
     rendered_task_instance_fields = relationship("RenderedTaskInstanceFields", lazy="noload", uselist=False)
@@ -462,12 +449,6 @@ class TaskInstance(Base, LoggingMixin):
     note = association_proxy("task_instance_note", "content", creator=_creator_note)
     task: Operator  # Not always set...
 
-    is_trigger_log_context: bool = False
-    """Indicate to FileTaskHandler that logging context should be set up for trigger logging.
-
-    :meta private:
-    """
-
     def __init__(
         self,
         task: Operator,
@@ -618,17 +599,15 @@ class TaskInstance(Base, LoggingMixin):
         """
         dag: DAG | DagModel
         # Use the dag if we have it, else fallback to the ORM dag_model, which might not be loaded
-        if hasattr(self, "task") and hasattr(self.task, "dag") and self.task.dag is not None:
+        if hasattr(self, "task") and hasattr(self.task, "dag"):
             dag = self.task.dag
         else:
             dag = self.dag_model
 
         should_pass_filepath = not pickle_id and dag
-        path: PurePath | None = None
+        path = None
         if should_pass_filepath:
             if dag.is_subdag:
-                if TYPE_CHECKING:
-                    assert dag.parent_dag is not None
                 path = dag.parent_dag.relative_fileloc
             else:
                 path = dag.relative_fileloc
@@ -636,6 +615,7 @@ class TaskInstance(Base, LoggingMixin):
             if path:
                 if not path.is_absolute():
                     path = "DAGS_FOLDER" / path
+                path = str(path)
 
         return TaskInstance.generate_command(
             self.dag_id,
@@ -670,7 +650,7 @@ class TaskInstance(Base, LoggingMixin):
         ignore_ti_state: bool = False,
         local: bool = False,
         pickle_id: int | None = None,
-        file_path: PurePath | str | None = None,
+        file_path: str | None = None,
         raw: bool = False,
         job_id: str | None = None,
         pool: str | None = None,
@@ -726,7 +706,7 @@ class TaskInstance(Base, LoggingMixin):
         if raw:
             cmd.extend(["--raw"])
         if file_path:
-            cmd.extend(["--subdir", os.fspath(file_path)])
+            cmd.extend(["--subdir", file_path])
         if cfg_path:
             cmd.extend(["--cfg-path", cfg_path])
         if map_index != -1:
@@ -1314,10 +1294,7 @@ class TaskInstance(Base, LoggingMixin):
                 session.commit()
                 return False
 
-        if self.next_kwargs is not None:
-            self.log.info("Resuming after deferral")
-        else:
-            self.log.info("Starting attempt %s of %s", self.try_number, self.max_tries + 1)
+        self.log.info("Starting attempt %s of %s", self.try_number, self.max_tries + 1)
         self._try_number += 1
 
         if not test_mode:
@@ -1379,7 +1356,7 @@ class TaskInstance(Base, LoggingMixin):
         job_id: str | None = None,
         pool: str | None = None,
         session: Session = NEW_SESSION,
-    ) -> TaskReturnCode | None:
+    ) -> None:
         """
         Immediately runs the task (without checking or changing db state
         before execution) and then sets the appropriate final state after
@@ -1435,7 +1412,7 @@ class TaskInstance(Base, LoggingMixin):
                 session.add(Log(self.state, self))
                 session.merge(self)
                 session.commit()
-            return TaskReturnCode.DEFERRED
+            return
         except AirflowSkipException as e:
             # Recording SKIP
             # log only if exception has any arguments to prevent log flooding
@@ -1447,7 +1424,7 @@ class TaskInstance(Base, LoggingMixin):
         except AirflowRescheduleException as reschedule_exception:
             self._handle_reschedule(actual_start_date, reschedule_exception, test_mode, session=session)
             session.commit()
-            return None
+            return
         except (AirflowFailException, AirflowSensorTimeout) as e:
             # If AirflowFailException is raised, task should not retry.
             # If a sensor in reschedule mode reaches timeout, task should not retry.
@@ -1464,7 +1441,7 @@ class TaskInstance(Base, LoggingMixin):
                 self.clear_next_method_args()
                 session.merge(self)
                 session.commit()
-                return None
+                return
             else:
                 self.handle_failure(e, test_mode, context, session=session)
                 session.commit()
@@ -1497,7 +1474,6 @@ class TaskInstance(Base, LoggingMixin):
                 )
 
             session.commit()
-        return None
 
     def _register_dataset_changes(self, *, session: Session) -> None:
         for obj in self.task.outlets or []:
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 7fe62bc296..57d2ac8f26 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -21,7 +21,7 @@ from traceback import format_exception
 from typing import Any, Iterable
 
 from sqlalchemy import Column, Integer, String, func, or_
-from sqlalchemy.orm import joinedload, relationship
+from sqlalchemy.orm import relationship
 
 from airflow.models.base import Base
 from airflow.models.taskinstance import TaskInstance
@@ -65,8 +65,6 @@ class Trigger(Base):
         uselist=False,
     )
 
-    task_instance = relationship("TaskInstance", back_populates="trigger", lazy="joined", uselist=False)
-
     def __init__(self, classpath: str, kwargs: dict[str, Any], created_date: datetime.datetime | None = None):
         super().__init__()
         self.classpath = classpath
@@ -89,16 +87,7 @@ class Trigger(Base):
         Fetches all of the Triggers by ID and returns a dict mapping
         ID -> Trigger instance
         """
-        query = (
-            session.query(cls)
-            .filter(cls.id.in_(ids))
-            .options(
-                joinedload("task_instance"),
-                joinedload("task_instance.trigger"),
-                joinedload("task_instance.trigger.triggerer_job"),
-            )
-        )
-        return {obj.id: obj for obj in query}
+        return {obj.id: obj for obj in session.query(cls).filter(cls.id.in_(ids)).all()}
 
     @classmethod
     @provide_session
diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index d2f54cc780..e50a6d4d74 100644
--- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -40,8 +40,6 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
     :param filename_template: template for file name (local storage) or log stream name (remote)
     """
 
-    trigger_should_wrap = True
-
     def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: str | None = None):
         super().__init__(base_log_folder, filename_template)
         split_arn = log_group_arn.split(":")
@@ -67,7 +65,6 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
         self.handler = watchtower.CloudWatchLogHandler(
             log_group_name=self.log_group,
             log_stream_name=self._render_filename(ti, ti.try_number),
-            use_queues=not getattr(ti, "is_trigger_log_context", False),
             boto3_client=self.hook.get_conn(),
         )
 
diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py
index 831c864171..8535277b13 100644
--- a/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -34,8 +34,6 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
     uploads to and reads from S3 remote storage.
     """
 
-    trigger_should_wrap = True
-
     def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: str | None = None):
         super().__init__(base_log_folder, filename_template)
         self.remote_base = s3_log_folder
@@ -55,10 +53,9 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         super().set_context(ti)
         # Local location and remote location is needed to open and
         # upload local log file to S3 remote storage.
-        full_path = self.handler.baseFilename
-        self.log_relative_path = pathlib.Path(full_path).relative_to(self.local_base).as_posix()
-        is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
-        self.upload_on_close = is_trigger_log_context or not ti.raw
+        self.log_relative_path = self._render_filename(ti, ti.try_number)
+        self.upload_on_close = not ti.raw
+
         # Clear the file first so that duplicate data is not uploaded
         # when re-using the same path (e.g. with rescheduled sensors)
         if self.upload_on_close:
@@ -89,47 +86,42 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         # Mark closed so we don't double write if close is called twice
         self.closed = True
 
-    def _read_remote_logs(self, ti, try_number, metadata=None):
-        # Explicitly getting log relative path is necessary as the given
-        # task instance might be different than task instance passed in
-        # in set_context method.
-        worker_log_rel_path = self._render_filename(ti, try_number)
-
-        logs = []
-        messages = []
-        bucket, prefix = self.hook.parse_s3_url(s3url=os.path.join(self.remote_base, worker_log_rel_path))
-        keys = self.hook.list_keys(bucket_name=bucket, prefix=prefix)
-        if keys:
-            keys = [f"s3://{bucket}/{key}" for key in keys]
-            messages.extend(["Found logs in s3:", *[f"  * {x}" for x in sorted(keys)]])
-            for key in sorted(keys):
-                logs.append(self.s3_read(key, return_error=True))
-        else:
-            messages.append(f"No logs found on s3 for ti={ti}")
-        return messages, logs
-
     def _read(self, ti, try_number, metadata=None):
         """
         Read logs of given task instance and try_number from S3 remote storage.
         If failed, read the log from task instance host machine.
 
-        todo: when min airflow version >= 2.6 then remove this method (``_read``)
-
         :param ti: task instance object
         :param try_number: task instance try_number to read logs from
         :param metadata: log metadata,
                          can be used for steaming log reading and auto-tailing.
         """
-        # from airflow 2.6 we no longer implement the _read method
-        if hasattr(super(), "_read_remote_logs"):
-            return super()._read(ti, try_number, metadata)
-        # if we get here, we're on airflow < 2.6 and we use this backcompat logic
-        messages, logs = self._read_remote_logs(ti, try_number, metadata)
-        if logs:
-            return "".join(f"*** {x}\n" for x in messages) + "\n".join(logs), {"end_of_log": True}
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        log_relative_path = self._render_filename(ti, try_number)
+        remote_loc = os.path.join(self.remote_base, log_relative_path)
+
+        log_exists = False
+        log = ""
+
+        try:
+            log_exists = self.s3_log_exists(remote_loc)
+        except Exception as error:
+            self.log.exception("Failed to verify remote log exists %s.", remote_loc)
+            log = f"*** Failed to verify remote log exists {remote_loc}.\n{error}\n"
+
+        if log_exists:
+            # If S3 remote file exists, we do not fetch logs from task instance
+            # local machine even if there are errors reading remote logs, as
+            # returned remote_log will contain error messages.
+            remote_log = self.s3_read(remote_loc, return_error=True)
+            log = f"*** Reading remote log from {remote_loc}.\n{remote_log}\n"
+            return log, {"end_of_log": True}
         else:
+            log += "*** Falling back to local log\n"
             local_log, metadata = super()._read(ti, try_number, metadata)
-            return "*** Falling back to local log\n" + local_log, metadata
+            return log + local_log, metadata
 
     def s3_log_exists(self, remote_log_location: str) -> bool:
         """
@@ -180,7 +172,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
 
         # Default to a single retry attempt because s3 upload failures are
         # rare but occasionally occur.  Multiple retry attempts are unlikely
-        # to help as they usually indicate non-ephemeral errors.
+        # to help as they usually indicate non-empheral errors.
         for try_num in range(1 + max_retry):
             try:
                 self.hook.load_string(
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index 0c521e2c50..fce2f18814 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -72,8 +72,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
     MAX_LINE_PER_PAGE = 1000
     LOG_NAME = "Elasticsearch"
 
-    trigger_should_wrap = True
-
     def __init__(
         self,
         base_log_folder: str,
@@ -326,9 +324,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
 
         :param ti: task instance object
         """
-        is_trigger_log_context = getattr(ti, "is_trigger_log_context", None)
-        is_ti_raw = getattr(ti, "raw", None)
-        self.mark_end_on_close = not is_ti_raw and not is_trigger_log_context
+        self.mark_end_on_close = not ti.raw
 
         if self.json_format:
             self.formatter = ElasticsearchJSONFormatter(
@@ -364,9 +360,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
         if self.closed:
             return
 
-        # todo: remove `getattr` when min airflow version >= 2.6
-        if not self.mark_end_on_close or getattr(self, "ctx_task_deferred", None):
-            # when we're closing due to task deferral, don't mark end of log
+        if not self.mark_end_on_close:
             self.closed = True
             return
 
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py
index 4523cddc5f..a264821093 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -17,9 +17,7 @@
 # under the License.
 from __future__ import annotations
 
-import logging
 import os
-from pathlib import Path
 from typing import Collection
 
 # not sure why but mypy complains on missing `storage` but it is clearly there and is importable
@@ -28,7 +26,7 @@ from google.cloud import storage  # type: ignore[attr-defined]
 from airflow.compat.functools import cached_property
 from airflow.configuration import conf
 from airflow.exceptions import AirflowNotFoundException
-from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.providers.google.cloud.utils.credentials_provider import get_credentials_and_project_id
 from airflow.providers.google.common.consts import CLIENT_INFO
 from airflow.utils.log.file_task_handler import FileTaskHandler
@@ -40,8 +38,6 @@ _DEFAULT_SCOPESS = frozenset(
     ]
 )
 
-logger = logging.getLogger(__name__)
-
 
 class GCSTaskHandler(FileTaskHandler, LoggingMixin):
     """
@@ -65,8 +61,6 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         will be used.
     """
 
-    trigger_should_wrap = True
-
     def __init__(
         self,
         *,
@@ -122,10 +116,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         # Log relative path is used to construct local and remote
         # log path to upload log files into GCS and read from the
         # remote location.
-        full_path = self.handler.baseFilename
-        self.log_relative_path = Path(full_path).relative_to(self.local_base).as_posix()
-        is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
-        self.upload_on_close = is_trigger_log_context or not ti.raw
+        self.log_relative_path = self._render_filename(ti, ti.try_number)
+        self.upload_on_close = not ti.raw
 
     def close(self):
         """Close and upload local log file to remote storage GCS."""
@@ -152,60 +144,33 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         # Mark closed so we don't double write if close is called twice
         self.closed = True
 
-    def _add_message(self, msg):
-        filename, lineno, func, stackinfo = logger.findCaller()
-        record = logging.LogRecord("", logging.INFO, filename, lineno, msg + "\n", None, None, func=func)
-        return self.format(record)
-
-    def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]:
-        # Explicitly getting log relative path is necessary because this method
-        # is called from webserver from TaskLogReader, where we don't call set_context
-        # and can read logs for different TIs in each request
-        messages = []
-        logs = []
-        worker_log_relative_path = self._render_filename(ti, try_number)
-        remote_loc = os.path.join(self.remote_base, worker_log_relative_path)
-        uris = []
-        bucket, prefix = _parse_gcs_url(remote_loc)
-        blobs = list(self.client.list_blobs(bucket_or_name=bucket, prefix=prefix))
-
-        if blobs:
-            uris = [f"gs://{bucket}/{b.name}" for b in blobs]
-            messages.extend(["Found remote logs:", *[f"  * {x}" for x in sorted(uris)]])
-        else:
-            messages.append(f"No logs found in GCS; ti=%s {ti}")
-        try:
-            for key in sorted(uris):
-                blob = storage.Blob.from_string(key, self.client)
-                remote_log = blob.download_as_bytes().decode()
-                if remote_log:
-                    logs.append(remote_log)
-        except Exception as e:
-            messages.append(f"Unable to read remote log {e}")
-        return messages, logs
-
     def _read(self, ti, try_number, metadata=None):
         """
         Read logs of given task instance and try_number from GCS.
         If failed, read the log from task instance host machine.
 
-        todo: when min airflow version >= 2.6, remove this method
-
         :param ti: task instance object
         :param try_number: task instance try_number to read logs from
         :param metadata: log metadata,
                          can be used for steaming log reading and auto-tailing.
         """
-        if hasattr(super(), "_read_remote_logs"):
-            # from Airflow 2.6, we don't implement the `_read` method.
-            # if parent has _read_remote_logs, we're >= 2.6
-            return super()._read(ti, try_number, metadata)
-
-        messages, logs = self._read_remote_logs(ti, try_number, metadata)
-        if not logs:
-            return super()._read(ti, try_number, metadata)
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        log_relative_path = self._render_filename(ti, try_number)
+        remote_loc = os.path.join(self.remote_base, log_relative_path)
 
-        return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True}
+        try:
+            blob = storage.Blob.from_string(remote_loc, self.client)
+            remote_log = blob.download_as_bytes().decode()
+            log = f"*** Reading remote log from {remote_loc}.\n{remote_log}\n"
+            return log, {"end_of_log": True}
+        except Exception as e:
+            log = f"*** Unable to read remote log from {remote_loc}\n*** {str(e)}\n\n"
+            self.log.error(log)
+            local_log, metadata = super()._read(ti, try_number, metadata)
+            log += local_log
+            return log, metadata
 
     def gcs_write(self, log, remote_log_location):
         """
@@ -220,28 +185,12 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             old_log = blob.download_as_bytes().decode()
             log = "\n".join([old_log, log]) if old_log else log
         except Exception as e:
-            if self.no_log_found(e):
-                pass
-            else:
-                log += self._add_message(
-                    f"Error checking for previous log; if exists, may be overwritten: {str(e)}"
-                )
-                self.log.warning("Error checking for previous log: %s", e)
+            if not hasattr(e, "resp") or e.resp.get("status") != "404":
+                log = f"*** Previous log discarded: {str(e)}\n\n" + log
+                self.log.info("Previous log discarded: %s", e)
+
         try:
             blob = storage.Blob.from_string(remote_log_location, self.client)
             blob.upload_from_string(log, content_type="text/plain")
         except Exception as e:
             self.log.error("Could not write logs to %s: %s", remote_log_location, e)
-
-    @staticmethod
-    def no_log_found(exc):
-        """
-        Given exception, determine whether it is result of log not found.
-
-        :meta private:
-        """
-        if exc.args and isinstance(exc.args[0], str) and "No such object" in exc.args[0]:
-            return True
-        elif getattr(exc, "resp", {}).get("status") == "404":
-            return True
-        return False
diff --git a/airflow/providers/google/cloud/log/stackdriver_task_handler.py b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
index 5190fbad76..0478693761 100644
--- a/airflow/providers/google/cloud/log/stackdriver_task_handler.py
+++ b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 import logging
-from contextvars import ContextVar
 from typing import Collection
 from urllib.parse import urlencode
 
@@ -34,13 +33,6 @@ from airflow.models import TaskInstance
 from airflow.providers.google.cloud.utils.credentials_provider import get_credentials_and_project_id
 from airflow.providers.google.common.consts import CLIENT_INFO
 
-try:
-    # todo: remove this conditional import when min airflow version >= 2.6
-    ctx_indiv_trigger: ContextVar | None
-    from airflow.utils.log.trigger_handler import ctx_indiv_trigger
-except ImportError:
-    ctx_indiv_trigger = None
-
 DEFAULT_LOGGER_NAME = "airflow"
 _GLOBAL_RESOURCE = Resource(type="global", labels={})
 
@@ -86,11 +78,6 @@ class StackdriverTaskHandler(logging.Handler):
     LOG_VIEWER_BASE_URL = "https://console.cloud.google.com/logs/viewer"
     LOG_NAME = "Google Stackdriver"
 
-    trigger_supported = True
-    trigger_should_queue = False
-    trigger_should_wrap = False
-    trigger_send_end_marker = False
-
     def __init__(
         self,
         gcp_key_path: str | None = None,
@@ -145,36 +132,23 @@ class StackdriverTaskHandler(logging.Handler):
         # arguments are a requirement for any class that derives from Transport class, hence ignore:
         return self.transport_type(self._client, self.name)  # type: ignore[call-arg]
 
-    def _get_labels(self, task_instance=None):
-        """When"""
-        if task_instance:
-            ti_labels = self._task_instance_to_labels(task_instance)
-        else:
-            ti_labels = self.task_instance_labels
+    def emit(self, record: logging.LogRecord) -> None:
+        """Actually log the specified logging record.
+
+        :param record: The record to be logged.
+        """
+        message = self.format(record)
         labels: dict[str, str] | None
-        if self.labels and ti_labels:
+        if self.labels and self.task_instance_labels:
             labels = {}
             labels.update(self.labels)
-            labels.update(ti_labels)
+            labels.update(self.task_instance_labels)
         elif self.labels:
             labels = self.labels
-        elif ti_labels:
-            labels = ti_labels
+        elif self.task_instance_labels:
+            labels = self.task_instance_labels
         else:
             labels = None
-        return labels or {}
-
-    def emit(self, record: logging.LogRecord) -> None:
-        """Actually log the specified logging record.
-
-        :param record: The record to be logged.
-        """
-        message = self.format(record)
-        ti = None
-        # todo: remove ctx_indiv_trigger is not None check when min airflow version >= 2.6
-        if ctx_indiv_trigger is not None and getattr(record, ctx_indiv_trigger.name, None):
-            ti = getattr(record, "task_instance", None)  # trigger context
-        labels = self._get_labels(ti)
         self._transport.send(record, message, resource=self.resource, labels=labels)
 
     def set_context(self, task_instance: TaskInstance) -> None:
@@ -317,10 +291,8 @@ class StackdriverTaskHandler(logging.Handler):
         page: ListLogEntriesResponse = next(response.pages)
         messages = []
         for entry in page.entries:
-            if "message" in (entry.json_payload or {}):
+            if "message" in entry.json_payload:
                 messages.append(entry.json_payload["message"])
-            elif entry.text_payload:
-                messages.append(entry.text_payload)
         return "\n".join(messages), page.next_page_token
 
     @classmethod
diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
index 52af3171c2..8c0fe22083 100644
--- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py
+++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py
@@ -19,10 +19,7 @@ from __future__ import annotations
 
 import os
 import shutil
-from pathlib import Path
-from typing import TYPE_CHECKING, Any
-
-from azure.core.exceptions import HttpResponseError
+from typing import Any
 
 from airflow.compat.functools import cached_property
 from airflow.configuration import conf
@@ -37,8 +34,6 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
     uploads to and reads from Wasb remote storage.
     """
 
-    trigger_should_wrap = True
-
     def __init__(
         self,
         base_log_folder: str,
@@ -79,13 +74,8 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
         super().set_context(ti)
         # Local location and remote location is needed to open and
         # upload local log file to Wasb remote storage.
-        if TYPE_CHECKING:
-            assert self.handler is not None
-
-        full_path = self.handler.baseFilename
-        self.log_relative_path = Path(full_path).relative_to(self.local_base).as_posix()
-        is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
-        self.upload_on_close = is_trigger_log_context or not ti.raw
+        self.log_relative_path = self._render_filename(ti, ti.try_number)
+        self.upload_on_close = not ti.raw
 
     def close(self) -> None:
         """Close and upload local log file to remote storage Wasb."""
@@ -114,43 +104,6 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
         # Mark closed so we don't double write if close is called twice
         self.closed = True
 
-    def _read_remote_logs(self, ti, try_number, metadata=None):
-        messages = []
-        logs = []
-        worker_log_relative_path = self._render_filename(ti, try_number)
-        # todo: fix this
-        # for some reason this handler was designed such that (1) container name is not configurable
-        # (i.e. it's hardcoded in airflow_local_settings.py) and (2) the "relative path" is actually...
-        # whatever you put in REMOTE_BASE_LOG_FOLDER i.e. it includes the "wasb://" in the blob
-        # name. it's very screwed up but to change it we have to be careful not to break backcompat.
-        prefix = os.path.join(self.remote_base, worker_log_relative_path)
-        blob_names = []
-        try:
-            blob_names = self.hook.get_blobs_list(container_name=self.wasb_container, prefix=prefix)
-        except HttpResponseError as e:
-            messages.append(f"tried listing blobs with prefix={prefix} and container={self.wasb_container}")
-            messages.append("could not list blobs " + str(e))
-            self.log.exception("can't list blobs")
-
-        if blob_names:
-            uris = [f"wasb://{self.wasb_container}/{b}" for b in blob_names]
-            messages.extend(["Found remote logs:", *[f"  * {x}" for x in sorted(uris)]])
-        else:
-            messages.append(f"No logs found in WASB; ti=%s {ti}")
-
-        for name in sorted(blob_names):
-            remote_log = ""
-            try:
-                remote_log = self.hook.read_file(self.wasb_container, name)
-                if remote_log:
-                    logs.append(remote_log)
-            except Exception as e:
-                messages.append(
-                    f"Unable to read remote blob '{name}' in container '{self.wasb_container}'\n{e}"
-                )
-                self.log.exception("Could not read blob")
-        return messages, logs
-
     def _read(
         self, ti, try_number: int, metadata: dict[str, Any] | None = None
     ) -> tuple[str, dict[str, bool]]:
@@ -158,23 +111,26 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
         Read logs of given task instance and try_number from Wasb remote storage.
         If failed, read the log from task instance host machine.
 
-        todo: when min airflow version >= 2.6, remove this method
-
         :param ti: task instance object
         :param try_number: task instance try_number to read logs from
         :param metadata: log metadata,
                          can be used for steaming log reading and auto-tailing.
         """
-        if hasattr(super(), "_read_remote_logs"):
-            # from Airflow 2.6, we don't implement the `_read` method.
-            # if parent has _read_remote_logs, we're >= 2.6
-            return super()._read(ti, try_number, metadata)
-
-        # below is backcompat, for airflow < 2.6
-        messages, logs = self._read_remote_logs(ti, try_number, metadata)
-        if not logs:
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        log_relative_path = self._render_filename(ti, try_number)
+        remote_loc = os.path.join(self.remote_base, log_relative_path)
+
+        if self.wasb_log_exists(remote_loc):
+            # If Wasb remote file exists, we do not fetch logs from task instance
+            # local machine even if there are errors reading remote logs, as
+            # returned remote_log will contain error messages.
+            remote_log = self.wasb_read(remote_loc, return_error=True)
+            log = f"*** Reading remote log from {remote_loc}.\n{remote_log}\n"
+            return log, {"end_of_log": True}
+        else:
             return super()._read(ti, try_number, metadata)
-        return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True}
 
     def wasb_log_exists(self, remote_log_location: str) -> bool:
         """
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index a06f2eb82d..4d2d55e927 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -24,7 +24,6 @@ import os
 import psutil
 from setproctitle import setproctitle
 
-from airflow.models.taskinstance import TaskReturnCode
 from airflow.settings import CAN_FORK
 from airflow.task.task_runner.base_task_runner import BaseTaskRunner
 from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
@@ -93,10 +92,8 @@ class StandardTaskRunner(BaseTaskRunner):
                     dag_id=self._task_instance.dag_id,
                     task_id=self._task_instance.task_id,
                 ):
-                    ret = args.func(args, dag=self.dag)
+                    args.func(args, dag=self.dag)
                     return_code = 0
-                    if isinstance(ret, TaskReturnCode):
-                        return_code = ret.value
             except Exception as exc:
                 return_code = 1
 
diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py
index f616df66a0..06bce36a4c 100644
--- a/airflow/triggers/base.py
+++ b/airflow/triggers/base.py
@@ -37,18 +37,7 @@ class BaseTrigger(abc.ABC, LoggingMixin):
     """
 
     def __init__(self, **kwargs):
-
-        # these values are set by triggerer when preparing to run the instance
-        # when run, they are injected into logger record.
-        self.task_instance = None
-        self.trigger_id = None
-
-    def _set_context(self, context):
-        """
-        This method, part of LoggingMixin, is used mainly for configuration of logging
-        for tasks, but is not used for triggers.
-        """
-        raise NotImplementedError
+        pass
 
     @abc.abstractmethod
     def serialize(self) -> tuple[str, dict[str, Any]]:
diff --git a/airflow/triggers/temporal.py b/airflow/triggers/temporal.py
index d7f8ab5299..3967940a7e 100644
--- a/airflow/triggers/temporal.py
+++ b/airflow/triggers/temporal.py
@@ -58,20 +58,13 @@ class DateTimeTrigger(BaseTrigger):
         "the number of seconds until the time" in case the system clock changes
         unexpectedly, or handles a DST change poorly.
         """
-        # Sleep in successively smaller increments starting from 1 hour down to 10 seconds at a time
-        self.log.info("trigger starting")
-        for step in 3600, 60, 10:
-            seconds_remaining = (self.moment - timezone.utcnow()).total_seconds()
-            while seconds_remaining > 2 * step:
-                self.log.info(f"{int(seconds_remaining)} seconds remaining; sleeping {step} seconds")
-                await asyncio.sleep(step)
-                seconds_remaining = (self.moment - timezone.utcnow()).total_seconds()
+        # Sleep an hour at a time while it's more than 2 hours away
+        while (self.moment - timezone.utcnow()).total_seconds() > 2 * 3600:
+            await asyncio.sleep(3600)
         # Sleep a second at a time otherwise
         while self.moment > timezone.utcnow():
-            self.log.info("sleeping 1 second...")
             await asyncio.sleep(1)
         # Send our single event and then we're done
-        self.log.info("yielding event with payload %r", self.moment)
         yield TriggerEvent(self.moment)
 
 
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 1496b31fac..0d54783244 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -22,115 +22,23 @@ import logging
 import os
 import warnings
 from contextlib import suppress
-from enum import Enum
 from pathlib import Path
-from typing import TYPE_CHECKING, Any, Callable, Iterable
+from typing import TYPE_CHECKING, Any
 from urllib.parse import urljoin
 
-import pendulum
-
-from airflow.compat.functools import cached_property
 from airflow.configuration import conf
-from airflow.exceptions import RemovedInAirflow3Warning
+from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning
 from airflow.executors.executor_loader import ExecutorLoader
 from airflow.utils.context import Context
 from airflow.utils.helpers import parse_template_string, render_template_to_string
 from airflow.utils.log.logging_mixin import SetContextPropagate
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 from airflow.utils.session import create_session
-from airflow.utils.state import State, TaskInstanceState
+from airflow.utils.state import State
 
 if TYPE_CHECKING:
     from airflow.models import TaskInstance
 
-logger = logging.getLogger(__name__)
-
-
-class LogType(str, Enum):
-    """
-    Type of service from which we retrieve logs.
-
-    :meta private:
-    """
-
-    TRIGGER = "trigger"
-    WORKER = "worker"
-
-
-def _set_task_deferred_context_var():
-    """
-    Tell task log handler that task exited with deferral.
-
-    This exists for the sole purpose of telling elasticsearch handler not to
-    emit end_of_log mark after task deferral.
-
-    Depending on how the task is run, we may need to set this in task command or in local task job.
-    Kubernetes executor requires the local task job invocation; local executor requires the task
-    command invocation.
-
-    :meta private:
-    """
-    logger = logging.getLogger()
-    with suppress(StopIteration):
-        h = next(h for h in logger.handlers if hasattr(h, "ctx_task_deferred"))
-        h.ctx_task_deferred = True
-
-
-def _fetch_logs_from_service(url, log_relative_path):
-    import httpx
-
-    from airflow.utils.jwt_signer import JWTSigner
-
-    timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None)
-    signer = JWTSigner(
-        secret_key=conf.get("webserver", "secret_key"),
-        expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30),
-        audience="task-instance-logs",
-    )
-    response = httpx.get(
-        url,
-        timeout=timeout,
-        headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
-    )
-    response.encoding = "utf-8"
-    return response
-
-
-_parse_timestamp = conf.getimport("core", "interleave_timestamp_parser", fallback=None)
-
-if not _parse_timestamp:
-
-    def _parse_timestamp(line: str):
-        timestamp_str, _ = line.split(" ", 1)
-        return pendulum.parse(timestamp_str.strip("[]"))
-
-
-def _parse_timestamps_in_log_file(lines: Iterable[str]):
-    timestamp = None
-    next_timestamp = None
-    for idx, line in enumerate(lines):
-        if not line:
-            continue
-        with suppress(Exception):
-            # next_timestamp unchanged if line can't be parsed
-            next_timestamp = _parse_timestamp(line)
-        if next_timestamp:
-            timestamp = next_timestamp
-        yield timestamp, idx, line
-
-
-def _interleave_logs(*logs):
-    records = []
-    for log in logs:
-        records.extend(_parse_timestamps_in_log_file(log.splitlines()))
-    last = None
-    for _, _, v in sorted(
-        records, key=lambda x: (x[0], x[1]) if x[0] else (pendulum.datetime(2000, 1, 1), x[1])
-    ):
-        if v != last:  # dedupe
-            yield v
-        last = v
-
 
 class FileTaskHandler(logging.Handler):
     """
@@ -143,8 +51,6 @@ class FileTaskHandler(logging.Handler):
     :param filename_template: template filename string
     """
 
-    trigger_should_wrap = True
-
     def __init__(self, base_log_folder: str, filename_template: str | None = None):
         super().__init__()
         self.handler: logging.FileHandler | None = None
@@ -164,13 +70,6 @@ class FileTaskHandler(logging.Handler):
         :meta private:
         """
 
-        self.ctx_task_deferred = False
-        """
-        If true, task exited with deferral to trigger.
-
-        Some handlers emit "end of log" markers, and may not wish to do so when task defers.
-        """
-
     def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
         """
         Provide task_instance context to airflow task handler.
@@ -190,22 +89,6 @@ class FileTaskHandler(logging.Handler):
         self.handler.setLevel(self.level)
         return SetContextPropagate.MAINTAIN_PROPAGATE if self.maintain_propagate else None
 
-    @staticmethod
-    def add_triggerer_suffix(full_path, job_id=None):
-        """
-        Helper for deriving trigger log filename from task log filename.
-
-        E.g. given /path/to/file.log returns /path/to/file.log.trigger.123.log, where 123
-        is the triggerer id.  We use the triggerer ID instead of trigger ID to distinguish
-        the files because, rarely, the same trigger could get picked up by two different
-        triggerer instances.
-        """
-        full_path = Path(full_path).as_posix()
-        full_path += f".{LogType.TRIGGER}"
-        if job_id:
-            full_path += f".{job_id}.log"
-        return full_path
-
     def emit(self, record):
         if self.handler:
             self.handler.emit(record)
@@ -219,7 +102,6 @@ class FileTaskHandler(logging.Handler):
             self.handler.close()
 
     def _render_filename(self, ti: TaskInstance, try_number: int) -> str:
-        """Returns the worker log filename."""
         with create_session() as session:
             dag_run = ti.get_dagrun(session=session)
             template = dag_run.get_log_template(session=session).filename
@@ -265,18 +147,56 @@ class FileTaskHandler(logging.Handler):
     def _read_grouped_logs(self):
         return False
 
-    @cached_property
-    def _executor_get_task_log(self) -> Callable[[TaskInstance], tuple[list[str], list[str]]]:
-        """This cached property avoids loading executor repeatedly."""
-        executor = ExecutorLoader.get_default_executor()
-        return executor.get_task_log
-
-    def _read(
-        self,
-        ti: TaskInstance,
-        try_number: int,
-        metadata: dict[str, Any] | None = None,
-    ):
+    def _get_task_log_from_worker(
+        self, ti: TaskInstance, log: str, log_relative_path: str
+    ) -> str | tuple[str, dict[str, bool]]:
+        import httpx
+
+        from airflow.utils.jwt_signer import JWTSigner
+
+        url = self._get_log_retrieval_url(ti, log_relative_path)
+        log += f"*** Fetching from: {url}\n"
+
+        try:
+            timeout = None  # No timeout
+            try:
+                timeout = conf.getint("webserver", "log_fetch_timeout_sec")
+            except (AirflowConfigException, ValueError):
+                pass
+
+            signer = JWTSigner(
+                secret_key=conf.get("webserver", "secret_key"),
+                expiration_time_in_seconds=conf.getint("webserver", "log_request_clock_grace", fallback=30),
+                audience="task-instance-logs",
+            )
+            response = httpx.get(
+                url,
+                timeout=timeout,
+                headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
+            )
+            response.encoding = "utf-8"
+
+            if response.status_code == 403:
+                log += (
+                    "*** !!!! Please make sure that all your Airflow components (e.g. "
+                    "schedulers, webservers and workers) have "
+                    "the same 'secret_key' configured in 'webserver' section and "
+                    "time is synchronized on all your machines (for example with ntpd) !!!!!\n***"
+                )
+                log += (
+                    "*** See more at https://airflow.apache.org/docs/apache-airflow/"
+                    "stable/configurations-ref.html#secret-key\n***"
+                )
+            # Check if the resource was properly fetched
+            response.raise_for_status()
+
+            log += "\n" + response.text
+            return log
+        except Exception as e:
+            log += f"*** Failed to fetch log file from worker. {str(e)}\n"
+            return log, {"end_of_log": True}
+
+    def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None):
         """
         Template method that contains custom logic of reading
         logs given the try_number.
@@ -290,6 +210,7 @@ class FileTaskHandler(logging.Handler):
                                   which was retrieved in previous calls, this
                                   part will be skipped and only following test
                                   returned to be added to tail.
+
         :return: log message as a string and metadata.
                  Following attributes are used in metadata:
                  end_of_log: Boolean, True if end of log is reached or False
@@ -300,47 +221,45 @@ class FileTaskHandler(logging.Handler):
         # Task instance here might be different from task instance when
         # initializing the handler. Thus explicitly getting log location
         # is needed to get correct log path.
-        worker_log_rel_path = self._render_filename(ti, try_number)
-        messages_list: list[str] = []
-        remote_logs: list[str] = []
-        running_logs: list[str] = []
-        local_logs: list[str] = []
-        executor_messages: list[str] = []
-        executor_logs: list[str] = []
-        served_logs: list[str] = []
-        with suppress(NotImplementedError):
-            remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata)
-            messages_list.extend(remote_messages)
-        if ti.state == TaskInstanceState.RUNNING:
-            response = self._executor_get_task_log(ti)
-            if response:
-                executor_messages, executor_logs = response
-            if executor_messages:
-                messages_list.extend(messages_list)
-        if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not executor_messages:
-            served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path)
-            messages_list.extend(served_messages)
-        if not (remote_logs and ti.state not in State.unfinished):
-            # when finished, if we have remote logs, no need to check local
-            worker_log_full_path = Path(self.local_base, worker_log_rel_path)
-            local_messages, local_logs = self._read_from_local(worker_log_full_path)
-            messages_list.extend(local_messages)
-        logs = "\n".join(
-            _interleave_logs(
-                *local_logs,
-                *running_logs,
-                *remote_logs,
-                *(executor_logs or []),
-                *served_logs,
-            )
-        )
-        log_pos = len(logs)
-        messages = "".join([f"*** {x}\n" for x in messages_list])
+        log_relative_path = self._render_filename(ti, try_number)
+        location = os.path.join(self.local_base, log_relative_path)
+
+        log = ""
+        if os.path.exists(location):
+            try:
+                with open(location, encoding="utf-8", errors="surrogateescape") as file:
+                    log += f"*** Reading local file: {location}\n"
+                    log += "".join(file.readlines())
+            except Exception as e:
+                log = f"*** Failed to load local log file: {location}\n"
+                log += f"*** {str(e)}\n"
+                return log, {"end_of_log": True}
+        else:
+            log += f"*** Local log file does not exist: {location}\n"
+            executor = ExecutorLoader.get_default_executor()
+            task_log = None
+
+            task_log = executor.get_task_log(ti=ti, log=log)
+            if isinstance(task_log, tuple):
+                return task_log
+
+            if task_log is None:
+                log += "*** Failed to fetch log from executor. Falling back to fetching log from worker.\n"
+                task_log = self._get_task_log_from_worker(ti, log, log_relative_path=log_relative_path)
+
+            if isinstance(task_log, tuple):
+                return task_log
+
+            log = str(task_log)
+
+        # Process tailing if log is not at it's end
         end_of_log = ti.try_number != try_number or ti.state not in [State.RUNNING, State.DEFERRED]
+        log_pos = len(log)
         if metadata and "log_pos" in metadata:
             previous_chars = metadata["log_pos"]
-            logs = logs[previous_chars:]  # Cut off previously passed log test as new tail
-        return messages + logs, {"end_of_log": end_of_log, "log_pos": log_pos}
+            log = log[previous_chars:]  # Cut off previously passed log test as new tail
+
+        return log, {"end_of_log": end_of_log, "log_pos": log_pos}
 
     @staticmethod
     def _get_pod_namespace(ti: TaskInstance):
@@ -350,28 +269,13 @@ class FileTaskHandler(logging.Handler):
             namespace = pod_override.metadata.namespace
         return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
 
-    def _get_log_retrieval_url(
-        self, ti: TaskInstance, log_relative_path: str, log_type: LogType | None = None
-    ) -> tuple[str, str]:
-        """Given TI, generate URL with which to fetch logs from service log server."""
-        if log_type == LogType.TRIGGER:
-            if not ti.triggerer_job:
-                raise RuntimeError("Could not build triggerer log URL; no triggerer job.")
-            config_key = "triggerer_log_server_port"
-            config_default = 8794
-            hostname = ti.triggerer_job.hostname
-            log_relative_path = self.add_triggerer_suffix(log_relative_path, job_id=ti.triggerer_job.id)
-        else:
-            hostname = ti.hostname
-            config_key = "worker_log_server_port"
-            config_default = 8793
-        return (
-            urljoin(
-                f"http://{hostname}:{conf.get('logging', config_key, fallback=config_default)}/log/",
-                log_relative_path,
-            ),
+    @staticmethod
+    def _get_log_retrieval_url(ti: TaskInstance, log_relative_path: str) -> str:
+        url = urljoin(
+            f"http://{ti.hostname}:{conf.get('logging', 'WORKER_LOG_SERVER_PORT')}/log/",
             log_relative_path,
         )
+        return url
 
     def read(self, task_instance, try_number=None, metadata=None):
         """
@@ -380,13 +284,15 @@ class FileTaskHandler(logging.Handler):
         :param task_instance: task instance object
         :param try_number: task instance try_number to read logs from. If None
                            it returns all logs separated by try_number
-        :param metadata: log metadata, can be used for steaming log reading and auto-tailing.
+        :param metadata: log metadata,
+                         can be used for steaming log reading and auto-tailing.
         :return: a list of listed tuples which order log string by host
         """
         # Task instance increments its try number when it starts to run.
         # So the log for a particular task try will only show up when
         # try number gets incremented in DB, i.e logs produced the time
         # after cli run and before try_number + 1 in DB will not be displayed.
+
         if try_number is None:
             next_try = task_instance.next_try_number
             try_numbers = list(range(1, next_try))
@@ -400,8 +306,6 @@ class FileTaskHandler(logging.Handler):
 
         logs = [""] * len(try_numbers)
         metadata_array = [{}] * len(try_numbers)
-
-        # subclasses implement _read and may not have log_type, which was added recently
         for i, try_number_element in enumerate(try_numbers):
             log, out_metadata = self._read(task_instance, try_number_element, metadata)
             # es_task_handler return logs grouped by host. wrap other handler returning log string
@@ -447,12 +351,21 @@ class FileTaskHandler(logging.Handler):
         :param ti: task instance object
         :return: relative log path of the given task instance
         """
-        local_relative_path = self._render_filename(ti, ti.try_number)
-        full_path = os.path.join(self.local_base, local_relative_path)
-        if ti.is_trigger_log_context is True:
-            # if this is true, we're invoked via set_context in the context of
-            # setting up individual trigger logging. return trigger log path.
-            full_path = self.add_triggerer_suffix(full_path=full_path, job_id=ti.triggerer_job.id)
+        # To handle log writing when tasks are impersonated, the log files need to
+        # be writable by the user that runs the Airflow command and the user
+        # that is impersonated. This is mainly to handle corner cases with the
+        # SubDagOperator. When the SubDagOperator is run, all of the operators
+        # run under the impersonated user and create appropriate log files
+        # as the impersonated user. However, if the user manually runs tasks
+        # of the SubDagOperator through the UI, then the log files are created
+        # by the user that runs the Airflow command. For example, the Airflow
+        # run command may be run by the `airflow_sudoable` user, but the Airflow
+        # tasks may be run by the `airflow` user. If the log files are not
+        # writable by both users, then it's possible that re-running a task
+        # via the UI (or vice versa) results in a permission error as the task
+        # tries to write to a log file created by the other user.
+        relative_path = self._render_filename(ti, ti.try_number)
+        full_path = os.path.join(self.local_base, relative_path)
         self._prepare_log_folder(Path(full_path).parent)
 
         if not os.path.exists(full_path):
@@ -464,44 +377,3 @@ class FileTaskHandler(logging.Handler):
                 logging.warning("OSError while change ownership of the log file")
 
         return full_path
-
-    @staticmethod
-    def _read_from_local(worker_log_path: Path) -> tuple[list[str], list[str]]:
-        messages = []
-        logs = []
-        files = list(worker_log_path.parent.glob(worker_log_path.name + "*"))
-        if files:
-            messages.extend(["Found local files:", *[f"  * {x}" for x in sorted(files)]])
-        for file in sorted(files):
-            logs.append(Path(file).read_text())
-        return messages, logs
-
-    def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], list[str]]:
-        messages = []
-        logs = []
-        try:
-            log_type = LogType.TRIGGER if ti.triggerer_job else LogType.WORKER
-            url, rel_path = self._get_log_retrieval_url(ti, worker_log_rel_path, log_type=log_type)
-            response = _fetch_logs_from_service(url, rel_path)
-            if response.status_code == 403:
-                messages.append(
-                    "!!!! Please make sure that all your Airflow components (e.g. "
-                    "schedulers, webservers, workers and triggerer) have "
-                    "the same 'secret_key' configured in 'webserver' section and "
-                    "time is synchronized on all your machines (for example with ntpd)\n"
-                    "See more at https://airflow.apache.org/docs/apache-airflow/"
-                    "stable/configurations-ref.html#secret-key"
-                )
-            # Check if the resource was properly fetched
-            response.raise_for_status()
-            if response.text:
-                messages.append(f"Found logs served from host {url}")
-                logs.append(response.text)
-        except Exception as e:
-            messages.append(f"Could not read served logs: {str(e)}")
-            logger.exception("Could not read served logs")
-        return messages, logs
-
-    def _read_remote_logs(self, ti, try_number, metadata=None):
-        """Implement in subclasses to read from the remote service"""
-        raise NotImplementedError
diff --git a/airflow/utils/log/trigger_handler.py b/airflow/utils/log/trigger_handler.py
deleted file mode 100644
index 50756cb048..0000000000
--- a/airflow/utils/log/trigger_handler.py
+++ /dev/null
@@ -1,139 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from __future__ import annotations
-
-import asyncio
-import logging
-from contextvars import ContextVar
-from copy import copy
-from logging.handlers import QueueHandler
-
-from airflow.utils.log.file_task_handler import FileTaskHandler
-
-ctx_task_instance: ContextVar = ContextVar("task_instance")
-ctx_trigger_id: ContextVar = ContextVar("trigger_id")
-ctx_trigger_end: ContextVar = ContextVar("trigger_end")
-ctx_indiv_trigger: ContextVar = ContextVar("__individual_trigger")
-
-
-class TriggerMetadataFilter(logging.Filter):
-    """
-    Injects TI key, triggerer job_id, and trigger_id into the log record.
-
-    :meta private:
-    """
-
-    def filter(self, record):
-        for var in (
-            ctx_task_instance,
-            ctx_trigger_id,
-            ctx_trigger_end,
-            ctx_indiv_trigger,
-        ):
-            val = var.get(None)
-            if val is not None:
-                setattr(record, var.name, val)
-        return True
-
-
-class DropTriggerLogsFilter(logging.Filter):
-    """
-    If record has attr with name ctx_indiv_trigger, filter the record.
-
-    The purpose here is to prevent trigger logs from going to stdout
-    in the trigger service.
-
-    :meta private:
-    """
-
-    def filter(self, record):
-        return getattr(record, ctx_indiv_trigger.name, None) is None
-
-
-class TriggererHandlerWrapper(logging.Handler):
-    """
-    Wrap inheritors of FileTaskHandler and direct log messages
-    to them based on trigger_id.
-
-    :meta private:
-    """
-
-    trigger_should_queue = True
-
-    def __init__(self, base_handler: FileTaskHandler, level=logging.NOTSET):
-        super().__init__(level=level)
-        self.base_handler: FileTaskHandler = base_handler
-        self.handlers: dict[int, FileTaskHandler] = {}
-
-    def _make_handler(self, ti):
-        h = copy(self.base_handler)
-        h.set_context(ti=ti)
-        return h
-
-    def _get_or_create_handler(self, trigger_id, ti):
-        if trigger_id not in self.handlers:
-            self.handlers[trigger_id] = self._make_handler(ti)
-        return self.handlers[trigger_id]
-
-    def emit(self, record):
-        h = self._get_or_create_handler(record.trigger_id, record.task_instance)
-        h.emit(record)
-
-    def handle(self, record):
-        if not getattr(record, ctx_indiv_trigger.name, None):
-            return False
-        if record.trigger_end:
-            self.close_one(record.trigger_id)
-            return False
-        emit = self.filter(record)
-        if emit:
-            self.emit(record)
-        return emit
-
-    def close_one(self, trigger_id):
-        h = self.handlers.get(trigger_id)
-        if h:
-            h.close()
-            del self.handlers[trigger_id]
-
-    def flush(self):
-        for _, h in self.handlers.items():
-            h.flush()
-
-    def close(self):
-        for trigger_id in list(self.handlers.keys()):
-            h = self.handlers[trigger_id]
-            h.close()
-            del self.handlers[trigger_id]
-
-
-class LocalQueueHandler(QueueHandler):
-    """
-    Send messages to queue.
-
-    :meta private:
-    """
-
-    def emit(self, record: logging.LogRecord) -> None:
-        # There is no need to call `prepare` because queue is in same process.
-        try:
-            self.enqueue(record)
-        except asyncio.CancelledError:
-            raise
-        except Exception:
-            self.handleError(record)
diff --git a/airflow/utils/serve_logs.py b/airflow/utils/serve_logs.py
index d647fd74d3..7557339164 100644
--- a/airflow/utils/serve_logs.py
+++ b/airflow/utils/serve_logs.py
@@ -138,19 +138,19 @@ class StandaloneGunicornApplication(gunicorn.app.base.BaseApplication):
         return self.application
 
 
-def serve_logs(port=None):
+def serve_logs():
     """Serves logs generated by Worker"""
     setproctitle("airflow serve-logs")
     wsgi_app = create_app()
 
-    port = port or conf.getint("logging", "WORKER_LOG_SERVER_PORT")
+    worker_log_server_port = conf.getint("logging", "WORKER_LOG_SERVER_PORT")
 
     # If dual stack is available and IPV6_V6ONLY is not enabled on the socket
     # then when IPV6 is bound to it will also bind to IPV4 automatically
     if getattr(socket, "has_dualstack_ipv6", lambda: False)():
-        bind_option = GunicornOption("bind", f"[::]:{port}")
+        bind_option = GunicornOption("bind", f"[::]:{worker_log_server_port}")
     else:
-        bind_option = GunicornOption("bind", f"0.0.0.0:{port}")
+        bind_option = GunicornOption("bind", f"0.0.0.0:{worker_log_server_port}")
 
     options = [bind_option, GunicornOption("workers", 2)]
     StandaloneGunicornApplication(wsgi_app, options).run()
diff --git a/airflow/www/static/js/components/TabWithTooltip.tsx b/airflow/www/static/js/components/TabWithTooltip.tsx
deleted file mode 100644
index c27b5f6b84..0000000000
--- a/airflow/www/static/js/components/TabWithTooltip.tsx
+++ /dev/null
@@ -1,47 +0,0 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import React from 'react';
-import {
-  Box,
-  useTab,
-  useMultiStyleConfig,
-  Button,
-  TabProps,
-} from '@chakra-ui/react';
-
-const TabWithTooltip = React.forwardRef<HTMLDivElement, TabProps>((props, ref) => {
-  const tabProps = useTab({ ...props, ref });
-  const styles = useMultiStyleConfig('Tabs', tabProps);
-
-  return (
-    <Box {...tabProps}>
-      <Button
-        __css={styles.tab}
-        {...tabProps}
-        pointerEvents={props.isDisabled ? 'none' : 'auto'}
-        py={3}
-      >
-        {tabProps.children}
-      </Button>
-    </Box>
-  );
-});
-
-export default TabWithTooltip;
diff --git a/airflow/www/static/js/dag/details/taskInstance/Logs/utils.ts b/airflow/www/static/js/dag/details/taskInstance/Logs/utils.ts
index 75e7ca1bed..31f9cdeb69 100644
--- a/airflow/www/static/js/dag/details/taskInstance/Logs/utils.ts
+++ b/airflow/www/static/js/dag/details/taskInstance/Logs/utils.ts
@@ -46,6 +46,7 @@ export const parseLogs = (
   if (!data) {
     return {};
   }
+
   let lines;
 
   let warning;
diff --git a/chart/templates/triggerer/triggerer-deployment.yaml b/chart/templates/triggerer/triggerer-deployment.yaml
index 0d38ac1d66..78310394e9 100644
--- a/chart/templates/triggerer/triggerer-deployment.yaml
+++ b/chart/templates/triggerer/triggerer-deployment.yaml
@@ -20,15 +20,13 @@
 #################################
 {{- if semverCompare ">=2.2.0" .Values.airflowVersion }}
 {{- if .Values.triggerer.enabled }}
-{{- /* Airflow version 2.6.0 is when triggerer logs serve introduced */ -}}
-{{- $persistence := and .Values.triggerer.persistence.enabled (semverCompare ">=2.6.0" .Values.airflowVersion) }}
 {{- $nodeSelector := or .Values.triggerer.nodeSelector .Values.nodeSelector }}
 {{- $affinity := or .Values.triggerer.affinity .Values.affinity }}
 {{- $tolerations := or .Values.triggerer.tolerations .Values.tolerations }}
 {{- $topologySpreadConstraints := or .Values.triggerer.topologySpreadConstraints .Values.topologySpreadConstraints }}
 {{- $revisionHistoryLimit := or .Values.triggerer.revisionHistoryLimit .Values.revisionHistoryLimit }}
 {{- $securityContext := include "airflowSecurityContext" (list . .Values.triggerer) }}
-kind: {{ if $persistence }}StatefulSet{{ else }}Deployment{{ end }}
+kind: Deployment
 apiVersion: apps/v1
 metadata:
   name: {{ .Release.Name }}-triggerer
@@ -46,9 +44,6 @@ metadata:
     {{- toYaml .Values.triggerer.annotations | nindent 4 }}
   {{- end }}
 spec:
-  {{- if $persistence }}
-  serviceName: {{ .Release.Name }}-triggerer
-  {{- end }}
   replicas: {{ .Values.triggerer.replicas }}
   {{- if $revisionHistoryLimit }}
   revisionHistoryLimit: {{ $revisionHistoryLimit }}
@@ -58,11 +53,7 @@ spec:
       tier: airflow
       component: triggerer
       release: {{ .Release.Name }}
-  {{- if and $persistence .Values.triggerer.updateStrategy }}
-  updateStrategy:
-    {{- toYaml .Values.triggerer.updateStrategy | nindent 4 }}
-  {{- end }}
-  {{- if and (not $persistence) (.Values.triggerer.strategy) }}
+  {{- if .Values.triggerer.strategy }}
   strategy:
     {{- toYaml .Values.triggerer.strategy | nindent 4 }}
   {{- end }}
@@ -202,12 +193,6 @@ spec:
                   {{- else }}
                   {{- include "triggerer_liveness_check_command" . | nindent 16 }}
                   {{- end }}
-        {{- /* Airflow version 2.6.0 is when triggerer logs serve introduced */ -}}
-        {{- if semverCompare ">=2.6.0" .Values.airflowVersion }}
-          ports:
-            - name: triggerer-logs
-              containerPort: {{ .Values.ports.triggererLogs }}
-        {{- end }}
         {{- if and (.Values.dags.gitSync.enabled) (not .Values.dags.persistence.enabled) }}
         {{- include "git_sync_container" . | indent 8 }}
         {{- end }}
@@ -240,29 +225,13 @@ spec:
         {{- if .Values.triggerer.extraVolumes }}
         {{- toYaml .Values.triggerer.extraVolumes | nindent 8 }}
         {{- end }}
-{{- if .Values.logs.persistence.enabled }}
+        {{- if .Values.logs.persistence.enabled }}
         - name: logs
           persistentVolumeClaim:
             claimName: {{ template "airflow_logs_volume_claim" . }}
-{{- else if not $persistence }}
+        {{- else }}
         - name: logs
           emptyDir: {}
-{{- else }}
-  volumeClaimTemplates:
-    - metadata:
-        name: logs
-        {{- if .Values.triggerer.persistence.annotations }}
-        annotations:
-          {{- toYaml .Values.triggerer.persistence.annotations | nindent 10 }}
-        {{- end }}
-      spec:
-      {{- if .Values.triggerer.persistence.storageClassName }}
-        storageClassName: {{ .Values.triggerer.persistence.storageClassName }}
-      {{- end }}
-        accessModes: ["ReadWriteOnce"]
-        resources:
-          requests:
-            storage: {{ .Values.triggerer.persistence.size }}
 {{- end }}
 {{- end }}
 {{- end }}
diff --git a/chart/templates/triggerer/triggerer-networkpolicy.yaml b/chart/templates/triggerer/triggerer-networkpolicy.yaml
deleted file mode 100644
index 2d51f2f91a..0000000000
--- a/chart/templates/triggerer/triggerer-networkpolicy.yaml
+++ /dev/null
@@ -1,58 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-##################################
-## Airflow triggerer NetworkPolicy
-##################################
-{{- /* Airflow version 2.6.0 is when triggerer logs serve introduced */ -}}
-{{- if semverCompare ">=2.6.0" .Values.airflowVersion }}
-{{- if .Values.networkPolicies.enabled }}
-{{- if .Values.triggerer.enabled }}
-apiVersion: networking.k8s.io/v1
-kind: NetworkPolicy
-metadata:
-  name: {{ .Release.Name }}-triggerer-policy
-  labels:
-    tier: airflow
-    component: airflow-triggerer-policy
-    release: {{ .Release.Name }}
-    chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
-    heritage: {{ .Release.Service }}
-{{- if or (.Values.labels) (.Values.triggerer.labels) }}
-{{- mustMerge .Values.triggerer.labels .Values.labels | toYaml | nindent 4 }}
-{{- end }}
-spec:
-  podSelector:
-    matchLabels:
-      tier: airflow
-      component: triggerer
-      release: {{ .Release.Name }}
-  policyTypes:
-  - Ingress
-  ingress:
-  - from:
-    - podSelector:
-        matchLabels:
-          tier: airflow
-          release: {{ .Release.Name }}
-          component: webserver
-    ports:
-    - protocol: TCP
-      port: {{ .Values.ports.triggererLogs }}
-{{- end }}
-{{- end }}
-{{- end }}
diff --git a/chart/templates/triggerer/triggerer-service.yaml b/chart/templates/triggerer/triggerer-service.yaml
deleted file mode 100644
index 68536b46ec..0000000000
--- a/chart/templates/triggerer/triggerer-service.yaml
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-################################
-## Airflow triggerer Service
-#################################
-{{- /* Airflow version 2.6.0 is when triggerer logs serve introduced */ -}}
-{{- if semverCompare ">=2.6.0" .Values.airflowVersion }}
-{{- if .Values.triggerer.enabled }}
-kind: Service
-apiVersion: v1
-metadata:
-  name: {{ .Release.Name }}-triggerer
-  labels:
-    tier: airflow
-    component: triggerer
-    release: {{ .Release.Name }}
-    chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
-    heritage: {{ .Release.Service }}
-{{- if or (.Values.labels) (.Values.triggerer.labels) }}
-{{- mustMerge .Values.triggerer.labels .Values.labels | toYaml | nindent 4 }}
-{{- end }}
-spec:
-  clusterIP: None
-  selector:
-    tier: airflow
-    component: triggerer
-    release: {{ .Release.Name }}
-  ports:
-    - name: triggerer-logs
-      protocol: TCP
-      port: {{ .Values.ports.triggererLogs }}
-      targetPort: {{ .Values.ports.triggererLogs }}
-{{- end }}
-{{- end }}
diff --git a/chart/values.schema.json b/chart/values.schema.json
index 803dadda77..8bd29c0f6b 100644
--- a/chart/values.schema.json
+++ b/chart/values.schema.json
@@ -2200,14 +2200,6 @@
                         "exec airflow triggerer"
                     ]
                 },
-                "updateStrategy": {
-                    "description": "Specifies the strategy used to replace old Pods by new ones when deployed as a StatefulSet.",
-                    "type": [
-                        "null",
-                        "object"
-                    ],
-                    "default": null
-                },
                 "strategy": {
                     "description": "Specifies the strategy used to replace old Pods by new ones when deployed as a Deployment.",
                     "type": [
@@ -2249,44 +2241,6 @@
                         }
                     }
                 },
-                "persistence": {
-                    "description": "Persistence configuration.",
-                    "type": "object",
-                    "additionalProperties": false,
-                    "properties": {
-                        "enabled": {
-                            "description": "Enable persistent volumes.",
-                            "type": "boolean",
-                            "default": true
-                        },
-                        "size": {
-                            "description": "Volume size for triggerer StatefulSet.",
-                            "type": "string",
-                            "default": "100Gi"
-                        },
-                        "storageClassName": {
-                            "description": "If using a custom StorageClass, pass name ref to all StatefulSets here.",
-                            "type": [
-                                "string",
-                                "null"
-                            ],
-                            "default": null
-                        },
-                        "fixPermissions": {
-                            "description": "Execute init container to chown log directory. This is currently only needed in kind, due to usage of local-path provisioner.",
-                            "type": "boolean",
-                            "default": false
-                        },
-                        "annotations": {
-                            "description": "Annotations to add to triggerer volumes.",
-                            "type": "object",
-                            "default": {},
-                            "additionalProperties": {
-                                "type": "string"
-                            }
-                        }
-                    }
-                },
                 "resources": {
                     "description": "Resources for triggerer pods.",
                     "type": "object",
@@ -4995,11 +4949,6 @@
                     "type": "integer",
                     "default": 8793
                 },
-                "triggererLogs": {
-                    "description": "Triggerer logs port.",
-                    "type": "integer",
-                    "default": 8794
-                },
                 "redisDB": {
                     "description": "Redis port.",
                     "type": "integer",
diff --git a/chart/values.yaml b/chart/values.yaml
index 60a9575917..339bcb844d 100644
--- a/chart/values.yaml
+++ b/chart/values.yaml
@@ -1068,9 +1068,7 @@ triggerer:
   # Args to use when running Airflow triggerer (templated).
   args: ["bash", "-c", "exec airflow triggerer"]
 
-  # Update Strategy when triggerer is deployed as a StatefulSet
-  updateStrategy: ~
-  # Update Strategy when triggerer is deployed as a Deployment
+  # Update Strategy for triggerers
   strategy:
     rollingUpdate:
       maxSurge: "100%"
@@ -1102,20 +1100,6 @@ triggerer:
   #  fsGroup: 0
   #  runAsGroup: 0
 
-  persistence:
-    # Enable persistent volumes
-    enabled: true
-    # Volume size for triggerer StatefulSet
-    size: 100Gi
-    # If using a custom storageClass, pass name ref to all statefulSets here
-    storageClassName:
-    # Execute init container to chown log directory.
-    # This is currently only needed in kind, due to usage
-    # of local-path provisioner.
-    fixPermissions: false
-    # Annotations to add to triggerer volumes
-    annotations: {}
-
   resources: {}
   #  limits:
   #   cpu: 100m
@@ -1650,7 +1634,6 @@ ports:
   flowerUI: 5555
   airflowUI: 8080
   workerLogs: 8793
-  triggererLogs: 8794
   redisDB: 6379
   statsdIngest: 9125
   statsdScrape: 9102
diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
index c7d5819b31..e5b0335651 100644
--- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
+++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst
@@ -21,14 +21,17 @@ Logging for Tasks
 =================
 
 Airflow writes logs for tasks in a way that allows you to see the logs for each task separately in the Airflow UI.
-Core Airflow provides an interface FileTaskHandler, which writes task logs to file, and includes a mechanism to serve them from workers while tasks are running. The Apache Airflow Community also releases providers for many
+Core Airflow implements writing and serving logs locally. However, you can also write logs to remote
+services via community providers, or write your own loggers.
+
+Below we describe the local task logging, the Apache Airflow Community also releases providers for many
 services (:doc:`apache-airflow-providers:index`) and some of them provide handlers that extend the logging
 capability of Apache Airflow. You can see all of these providers in :doc:`apache-airflow-providers:core-extensions/logging`.
 
-Configuring logging
--------------------
+Writing logs Locally
+--------------------
 
-For the default handler, FileTaskHandler, you can specify the directory to place log files in ``airflow.cfg`` using
+You can specify the directory to place log files in ``airflow.cfg`` using
 ``base_log_folder``. By default, logs are placed in the ``AIRFLOW_HOME``
 directory.
 
@@ -44,14 +47,11 @@ These patterns can be adjusted by :ref:`config:logging__log_filename_template`.
 
 In addition, you can supply a remote location to store current logs and backups.
 
-Interleaving of logs
---------------------
-
-Airflow's remote task logging handlers can broadly be separated into two categories: streaming handlers (such as ElasticSearch, AWS Cloudwatch, and GCP operations logging, formerly stackdriver) and blob storage handlers (e.g. S3, GCS, WASB).
-
-For blob storage handlers, depending on the state of the task, logs could be in a lot of different places and in multiple different files.  For this reason, we need to check all locations and interleave what we find.  To do this we need to be able to parse the timestamp for each line.  If you are using a custom formatter you may need to override the default parser by providing a callable name at Airflow setting ``[logging] interleave_timestamp_parser``.
+In the Airflow UI, remote logs take precedence over local logs when remote logging is enabled. If remote logs
+can not be found or accessed, local logs will be displayed. Note that logs
+are only sent to remote storage once a task is complete (including failure). In other words, remote logs for
+running tasks are unavailable (but local logs are available).
 
-For streaming handlers, no matter the task phase or location of execution, all log messages can be sent to the logging service with the same identifier so generally speaking there isn't a need to check multiple sources and interleave.
 
 Troubleshooting
 ---------------
@@ -122,33 +122,15 @@ When using remote logging, you can configure Airflow to show a link to an extern
 
 Some external systems require specific configuration in Airflow for redirection to work but others do not.
 
-Serving logs from workers and triggerer
----------------------------------------
+Serving logs from workers
+-------------------------
 
-Most task handlers send logs upon completion of a task. In order to view logs in real time, Airflow starts an HTTP server to serve the logs in the following cases:
+Most task handlers send logs upon completion of a task. In order to view logs in real time, Airflow automatically starts an HTTP server to serve the logs in the following cases:
 
 - If ``SequentialExecutor`` or ``LocalExecutor`` is used, then when ``airflow scheduler`` is running.
 - If ``CeleryExecutor`` is used, then when ``airflow worker`` is running.
 
-In triggerer, logs are served unless the service is started with option ``--skip-serve-logs``.
-
-The server is running on the port specified by ``worker_log_server_port`` option in ``[logging]`` section, and option ``triggerer_log_server_port`` for triggerer.  Defaults are 8793 and 8794, respectively.
+The server is running on the port specified by ``worker_log_server_port`` option in ``[logging]`` section. By default, it is ``8793``.
 Communication between the webserver and the worker is signed with the key specified by ``secret_key`` option  in ``[webserver]`` section. You must ensure that the key matches so that communication can take place without problems.
 
 We are using `Gunicorn <https://gunicorn.org/>`__ as a WSGI server. Its configuration options can be overridden with the ``GUNICORN_CMD_ARGS`` env variable. For details, see `Gunicorn settings <https://docs.gunicorn.org/en/latest/settings.html#settings>`__.
-
-Implementing a custom file task handler
----------------------------------------
-
-.. note:: This is an advanced topic and most users should be able to just use an existing handler from :doc:`apache-airflow-providers:core-extensions/logging`.
-
-In our providers we have a healthy variety of options with all the major cloud providers.  But should you need to implement logging with a different service, and should you then decide to implement a custom FileTaskHandler, there are a few settings to be aware of, particularly in the context of trigger logging.
-
-Triggers require a shift in the way that logging is set up.  In contrast with tasks, many triggers run in the same process, and with triggers, since they run in asyncio, we have to be mindful of not introducing blocking calls through the logging handler.  And because of the variation in handler behavior (some write to file, some upload to blob storage, some send messages over network as they arrive, some do so in thread), we need to have some way to let triggerer know how to use them.
-
-To accomplish this we have a few attributes that may be set on the handler, either the instance or the class.  Inheritance is not respected for these parameters, because subclasses of FileTaskHandler may differ from it in the relevant characteristics.  These params are described below:
-
-- ``trigger_should_wrap``: Controls whether this handler should be wrapped by TriggerHandlerWrapper.  This is necessary when each instance of handler creates a file handler that it writes all messages to.
-- ``trigger_should_queue``: Controls whether the triggerer should put a QueueListener between the event loop and the handler, to ensure blocking IO in the handler does not disrupt the event loop.
-- ``trigger_send_end_marker``: Controls whether an END signal should be sent to the logger when trigger completes. It is used to tell the wrapper to close and remove the individual file handler specific to the trigger that just completed.
-- ``trigger_supported``: If ``trigger_should_wrap`` and ``trigger_should_queue`` are not True, we generally assume that the handler does not support triggers.  But if in this case the handler has ``trigger_supported`` set to True, then we'll still move the handler to root at triggerer start so that it will process trigger messages.  Essentially, this should be true for handlers that "natively" support triggers. One such example of this is the StackdriverTaskHandler.
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py
index 422830eb6e..023b62b484 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -166,10 +166,10 @@ class TestGetLog:
         )
         assert (
             response.json["content"]
-            == f"[('localhost', '*** Found local files:\\n***   * {expected_filename}\\nLog for testing.')]"
+            == f"[('localhost', '*** Reading local file: {expected_filename}\\nLog for testing.')]"
         )
         info = serializer.loads(response.json["continuation_token"])
-        assert info == {"end_of_log": True, "log_pos": 16}
+        assert info == {"end_of_log": True, "log_pos": 41 + len(expected_filename)}
         assert 200 == response.status_code
 
     @pytest.mark.parametrize(
@@ -203,7 +203,7 @@ class TestGetLog:
         assert 200 == response.status_code
         assert (
             response.data.decode("utf-8")
-            == f"localhost\n*** Found local files:\n***   * {expected_filename}\nLog for testing.\n"
+            == f"localhost\n*** Reading local file: {expected_filename}\nLog for testing.\n"
         )
 
     @pytest.mark.parametrize(
@@ -244,7 +244,7 @@ class TestGetLog:
         assert 200 == response.status_code
         assert (
             response.data.decode("utf-8")
-            == f"localhost\n*** Found local files:\n***   * {expected_filename}\nLog for testing.\n"
+            == f"localhost\n*** Reading local file: {expected_filename}\nLog for testing.\n"
         )
 
     def test_get_logs_response_with_ti_equal_to_none(self):
diff --git a/tests/charts/test_basic_helm_chart.py b/tests/charts/test_basic_helm_chart.py
index 2242ba1efe..171a012d4d 100644
--- a/tests/charts/test_basic_helm_chart.py
+++ b/tests/charts/test_basic_helm_chart.py
@@ -39,11 +39,9 @@ class TestBaseChartTest:
     def _get_object_count(self, version):
         if version == "2.3.2":
             return OBJECT_COUNT_IN_BASIC_DEPLOYMENT + 1
-        elif version == "2.6.0":
-            return OBJECT_COUNT_IN_BASIC_DEPLOYMENT + 1
         return OBJECT_COUNT_IN_BASIC_DEPLOYMENT
 
-    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "2.6.0", "default"])
+    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"])
     def test_basic_deployments(self, version):
         expected_object_count_in_basic_deployment = self._get_object_count(version)
         k8s_objects = render_chart(
@@ -62,7 +60,10 @@ class TestBaseChartTest:
         list_of_kind_names_tuples = {
             (k8s_object["kind"], k8s_object["metadata"]["name"]) for k8s_object in k8s_objects
         }
-        expected = {
+        if version == "2.3.2":
+            assert ("Secret", "test-basic-airflow-result-backend") in list_of_kind_names_tuples
+            list_of_kind_names_tuples.remove(("Secret", "test-basic-airflow-result-backend"))
+        assert list_of_kind_names_tuples == {
             ("ServiceAccount", "test-basic-create-user-job"),
             ("ServiceAccount", "test-basic-migrate-database-job"),
             ("ServiceAccount", "test-basic-redis"),
@@ -91,7 +92,7 @@ class TestBaseChartTest:
             ("Service", "test-basic-worker"),
             ("Deployment", "test-basic-scheduler"),
             ("Deployment", "test-basic-statsd"),
-            (self.default_trigger_obj(version), "test-basic-triggerer"),
+            ("Deployment", "test-basic-triggerer"),
             ("Deployment", "test-basic-webserver"),
             ("StatefulSet", "test-basic-postgresql"),
             ("StatefulSet", "test-basic-redis"),
@@ -99,11 +100,6 @@ class TestBaseChartTest:
             ("Job", "test-basic-create-user"),
             ("Job", "test-basic-run-airflow-migrations"),
         }
-        if version == "2.3.2":
-            expected.add(("Secret", "test-basic-airflow-result-backend"))
-        if version == "2.6.0":
-            expected.add(("Service", "test-basic-triggerer"))
-        assert list_of_kind_names_tuples == expected
         assert expected_object_count_in_basic_deployment == len(k8s_objects)
         for k8s_object in k8s_objects:
             labels = jmespath.search("metadata.labels", k8s_object) or {}
@@ -118,7 +114,7 @@ class TestBaseChartTest:
                 "test-label"
             ), f"Missing label test-label on {k8s_name}. Current labels: {labels}"
 
-    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "2.6.0", "default"])
+    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"])
     def test_basic_deployment_with_standalone_dag_processor(self, version):
         # Dag Processor creates two extra objects compared to the basic deployment
         object_count_in_basic_deployment = self._get_object_count(version)
@@ -140,7 +136,10 @@ class TestBaseChartTest:
         list_of_kind_names_tuples = {
             (k8s_object["kind"], k8s_object["metadata"]["name"]) for k8s_object in k8s_objects
         }
-        expected = {
+        if version == "2.3.2":
+            assert ("Secret", "test-basic-airflow-result-backend") in list_of_kind_names_tuples
+            list_of_kind_names_tuples.remove(("Secret", "test-basic-airflow-result-backend"))
+        assert list_of_kind_names_tuples == {
             ("ServiceAccount", "test-basic-create-user-job"),
             ("ServiceAccount", "test-basic-migrate-database-job"),
             ("ServiceAccount", "test-basic-redis"),
@@ -170,7 +169,7 @@ class TestBaseChartTest:
             ("Service", "test-basic-worker"),
             ("Deployment", "test-basic-scheduler"),
             ("Deployment", "test-basic-statsd"),
-            (self.default_trigger_obj(version), "test-basic-triggerer"),
+            ("Deployment", "test-basic-triggerer"),
             ("Deployment", "test-basic-dag-processor"),
             ("Deployment", "test-basic-webserver"),
             ("StatefulSet", "test-basic-postgresql"),
@@ -179,11 +178,6 @@ class TestBaseChartTest:
             ("Job", "test-basic-create-user"),
             ("Job", "test-basic-run-airflow-migrations"),
         }
-        if version == "2.3.2":
-            expected.add(("Secret", "test-basic-airflow-result-backend"))
-        if version == "2.6.0":
-            expected.add(("Service", "test-basic-triggerer"))
-        assert list_of_kind_names_tuples == expected
         assert expected_object_count_with_standalone_scheduler == len(k8s_objects)
         for k8s_object in k8s_objects:
             labels = jmespath.search("metadata.labels", k8s_object) or {}
@@ -345,7 +339,7 @@ class TestBaseChartTest:
             (f"{release_name}-worker", "Service", "worker"),
             (f"{release_name}-worker", "StatefulSet", "worker"),
             (f"{release_name}-worker-policy", "NetworkPolicy", "airflow-worker-policy"),
-            # (f"{release_name}-triggerer", "StatefulSet", "triggerer"),
+            (f"{release_name}-triggerer", "Deployment", "triggerer"),
             (f"{release_name}-dag-processor", "Deployment", "dag-processor"),
             (f"{release_name}-logs", "PersistentVolumeClaim", "logs-pvc"),
             (f"{release_name}-dags", "PersistentVolumeClaim", "dags-pvc"),
@@ -539,7 +533,3 @@ class TestBaseChartTest:
             "postgresql://postgres:postgres@overrideName:5432/postgres?sslmode=disable"
             == base64.b64decode(doc["data"]["connection"]).decode("utf-8")
         )
-
-    @staticmethod
-    def default_trigger_obj(version):
-        return "StatefulSet" if version == "2.6.0" else "Deployment"
diff --git a/tests/charts/test_extra_env_env_from.py b/tests/charts/test_extra_env_env_from.py
index 0dc5ff845a..b32b6fae5e 100644
--- a/tests/charts/test_extra_env_env_from.py
+++ b/tests/charts/test_extra_env_env_from.py
@@ -59,7 +59,7 @@ PARAMS = [
         ),
     ),
     (
-        ("StatefulSet", f"{RELEASE_NAME}-triggerer"),
+        ("Deployment", f"{RELEASE_NAME}-triggerer"),
         (
             "spec.template.spec.initContainers[0]",
             "spec.template.spec.containers[0]",
@@ -80,7 +80,6 @@ class TestExtraEnvEnvFrom:
     def setup_class(cls) -> None:
         values_str = textwrap.dedent(
             """
-            airflowVersion: "2.6.0"
             flower:
               enabled: true
             extraEnvFrom: |
diff --git a/tests/charts/test_rbac.py b/tests/charts/test_rbac.py
index 1f155d501a..ca78b1e27d 100644
--- a/tests/charts/test_rbac.py
+++ b/tests/charts/test_rbac.py
@@ -16,8 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-from copy import copy
-
 import jmespath
 import pytest
 
@@ -43,6 +41,7 @@ DEPLOYMENT_NO_RBAC_NO_SA_KIND_NAME_TUPLES = [
     ("Deployment", "test-rbac-webserver"),
     ("Deployment", "test-rbac-flower"),
     ("Deployment", "test-rbac-pgbouncer"),
+    ("Deployment", "test-rbac-triggerer"),
     ("StatefulSet", "test-rbac-postgresql"),
     ("StatefulSet", "test-rbac-redis"),
     ("StatefulSet", "test-rbac-worker"),
@@ -113,19 +112,14 @@ class TestRBAC:
             values["airflowVersion"] = version
         return values
 
-    @staticmethod
-    def _get_object_tuples(version):
-        tuples = copy(DEPLOYMENT_NO_RBAC_NO_SA_KIND_NAME_TUPLES)
-        if version == "2.6.0":
-            tuples.append(("Service", "test-rbac-triggerer"))
-            tuples.append(("StatefulSet", "test-rbac-triggerer"))
-        else:
-            tuples.append(("Deployment", "test-rbac-triggerer"))
+    def _get_object_count(self, version):
         if version == "2.3.2":
-            tuples.append(("Secret", "test-rbac-airflow-result-backend"))
-        return tuples
+            return [
+                ("Secret", "test-rbac-airflow-result-backend")
+            ] + DEPLOYMENT_NO_RBAC_NO_SA_KIND_NAME_TUPLES
+        return DEPLOYMENT_NO_RBAC_NO_SA_KIND_NAME_TUPLES
 
-    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "2.6.0", "default"])
+    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"])
     def test_deployments_no_rbac_no_sa(self, version):
         k8s_objects = render_chart(
             "test-rbac",
@@ -161,9 +155,9 @@ class TestRBAC:
         list_of_kind_names_tuples = [
             (k8s_object["kind"], k8s_object["metadata"]["name"]) for k8s_object in k8s_objects
         ]
-        assert sorted(list_of_kind_names_tuples) == sorted(self._get_object_tuples(version))
+        assert sorted(list_of_kind_names_tuples) == sorted(self._get_object_count(version))
 
-    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "2.6.0", "default"])
+    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"])
     def test_deployments_no_rbac_with_sa(self, version):
         k8s_objects = render_chart(
             "test-rbac",
@@ -181,10 +175,10 @@ class TestRBAC:
         list_of_kind_names_tuples = [
             (k8s_object["kind"], k8s_object["metadata"]["name"]) for k8s_object in k8s_objects
         ]
-        real_list_of_kind_names = self._get_object_tuples(version) + SERVICE_ACCOUNT_NAME_TUPLES
+        real_list_of_kind_names = self._get_object_count(version) + SERVICE_ACCOUNT_NAME_TUPLES
         assert sorted(list_of_kind_names_tuples) == sorted(real_list_of_kind_names)
 
-    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "2.6.0", "default"])
+    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"])
     def test_deployments_with_rbac_no_sa(self, version):
         k8s_objects = render_chart(
             "test-rbac",
@@ -219,10 +213,10 @@ class TestRBAC:
         list_of_kind_names_tuples = [
             (k8s_object["kind"], k8s_object["metadata"]["name"]) for k8s_object in k8s_objects
         ]
-        real_list_of_kind_names = self._get_object_tuples(version) + RBAC_ENABLED_KIND_NAME_TUPLES
+        real_list_of_kind_names = self._get_object_count(version) + RBAC_ENABLED_KIND_NAME_TUPLES
         assert sorted(list_of_kind_names_tuples) == sorted(real_list_of_kind_names)
 
-    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "2.6.0", "default"])
+    @pytest.mark.parametrize("version", ["2.3.2", "2.4.0", "default"])
     def test_deployments_with_rbac_with_sa(self, version):
         k8s_objects = render_chart(
             "test-rbac",
@@ -240,7 +234,7 @@ class TestRBAC:
             (k8s_object["kind"], k8s_object["metadata"]["name"]) for k8s_object in k8s_objects
         ]
         real_list_of_kind_names = (
-            self._get_object_tuples(version) + SERVICE_ACCOUNT_NAME_TUPLES + RBAC_ENABLED_KIND_NAME_TUPLES
+            self._get_object_count(version) + SERVICE_ACCOUNT_NAME_TUPLES + RBAC_ENABLED_KIND_NAME_TUPLES
         )
         assert sorted(list_of_kind_names_tuples) == sorted(real_list_of_kind_names)
 
diff --git a/tests/charts/test_triggerer.py b/tests/charts/test_triggerer.py
index 7beb872dca..8e25034550 100644
--- a/tests/charts/test_triggerer.py
+++ b/tests/charts/test_triggerer.py
@@ -362,10 +362,7 @@ class TestTriggerer:
     )
     def test_logs_persistence_changes_volume(self, log_persistence_values, expected_volume):
         docs = render_chart(
-            values={
-                "triggerer": {"persistence": {"enabled": False}},
-                "logs": {"persistence": log_persistence_values},
-            },
+            values={"logs": {"persistence": log_persistence_values}},
             show_only=["templates/triggerer/triggerer-deployment.yaml"],
         )
 
@@ -410,44 +407,20 @@ class TestTriggerer:
         assert jmespath.search("spec.template.spec.containers[0].resources", docs[0]) == {}
 
     @pytest.mark.parametrize(
-        "persistence, update_strategy, expected_update_strategy",
-        [
-            (False, None, None),
-            (True, {"rollingUpdate": {"partition": 0}}, {"rollingUpdate": {"partition": 0}}),
-            (True, None, None),
-        ],
-    )
-    def test_update_strategy(self, persistence, update_strategy, expected_update_strategy):
-        docs = render_chart(
-            values={
-                "airflowVersion": "2.6.0",
-                "executor": "CeleryExecutor",
-                "triggerer": {
-                    "persistence": {"enabled": persistence},
-                    "updateStrategy": update_strategy,
-                },
-            },
-            show_only=["templates/triggerer/triggerer-deployment.yaml"],
-        )
-
-        assert expected_update_strategy == jmespath.search("spec.updateStrategy", docs[0])
-
-    @pytest.mark.parametrize(
-        "persistence, strategy, expected_strategy",
+        "strategy, expected_strategy",
         [
-            (True, None, None),
+            (None, None),
             (
-                False,
                 {"rollingUpdate": {"maxSurge": "100%", "maxUnavailable": "50%"}},
                 {"rollingUpdate": {"maxSurge": "100%", "maxUnavailable": "50%"}},
             ),
-            (False, None, None),
         ],
     )
-    def test_strategy(self, persistence, strategy, expected_strategy):
+    def test_strategy(self, strategy, expected_strategy):
+        """strategy should be used when we aren't using both LocalExecutor and workers.persistence"""
         docs = render_chart(
             values={
-                "triggerer": {"persistence": {"enabled": persistence}, "strategy": strategy},
+                "triggerer": {"strategy": strategy},
             },
             show_only=["templates/triggerer/triggerer-deployment.yaml"],
         )
diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py
index a93dbe9caf..7398eb6c54 100644
--- a/tests/executors/test_base_executor.py
+++ b/tests/executors/test_base_executor.py
@@ -51,7 +51,7 @@ def test_is_single_threaded_default_value():
 def test_get_task_log():
     executor = BaseExecutor()
     ti = TaskInstance(task=BaseOperator(task_id="dummy"))
-    assert executor.get_task_log(ti=ti) == ([], [])
+    assert executor.get_task_log(ti=ti) is None
 
 
 def test_serve_logs_default_value():
diff --git a/tests/executors/test_celery_kubernetes_executor.py b/tests/executors/test_celery_kubernetes_executor.py
index ebf4e33c5f..fb5ccd1499 100644
--- a/tests/executors/test_celery_kubernetes_executor.py
+++ b/tests/executors/test_celery_kubernetes_executor.py
@@ -182,15 +182,15 @@ class TestCeleryKubernetesExecutor:
         cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)
         simple_task_instance = mock.MagicMock()
         simple_task_instance.queue = KUBERNETES_QUEUE
-        cke.get_task_log(ti=simple_task_instance)
-        k8s_executor_mock.get_task_log.assert_called_once_with(ti=simple_task_instance)
+        cke.get_task_log(ti=simple_task_instance, log="")
+        k8s_executor_mock.get_task_log.assert_called_once_with(ti=simple_task_instance, log=mock.ANY)
 
         k8s_executor_mock.reset_mock()
 
         simple_task_instance.queue = "test-queue"
-        log = cke.get_task_log(ti=simple_task_instance)
+        log = cke.get_task_log(ti=simple_task_instance, log="")
         k8s_executor_mock.get_task_log.assert_not_called()
-        assert log == ([], [])
+        assert log is None
 
     def test_get_event_buffer(self):
         celery_executor_mock = mock.MagicMock()
diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py
index 954ba9c7d0..f040eb8d07 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -1219,7 +1219,7 @@ class TestKubernetesExecutor:
         assert ti0.state == State.SCHEDULED
         assert ti1.state == State.QUEUED
 
-    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
+    @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
     def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operator):
         """fetch task log from pod"""
         mock_kube_client = mock_get_kube_client.return_value
@@ -1231,21 +1231,20 @@ class TestKubernetesExecutor:
         ti = create_task_instance_of_operator(EmptyOperator, dag_id="test_k8s_log_dag", task_id="test_task")
 
         executor = KubernetesExecutor()
-        messages, logs = executor.get_task_log(ti=ti)
+        log = executor.get_task_log(ti=ti, log="test_init_log")
 
         mock_kube_client.read_namespaced_pod_log.assert_called_once()
-        assert "Trying to get logs (last 100 lines) from worker pod " in messages
-        assert logs[0] == "a_\nb_\nc_"
+        assert "test_init_log" in log
+        assert "Trying to get logs (last 100 lines) from worker pod" in log
+        assert "a_b_c" in log
 
         mock_kube_client.reset_mock()
         mock_kube_client.read_namespaced_pod_log.side_effect = Exception("error_fetching_pod_log")
 
-        messages, logs = executor.get_task_log(ti=ti)
-        assert logs == [""]
-        assert messages == [
-            "Trying to get logs (last 100 lines) from worker pod ",
-            "Reading from k8s pod logs failed: error_fetching_pod_log",
-        ]
+        log = executor.get_task_log(ti=ti, log="test_init_log")
+        assert len(log) == 2
+        assert "error_fetching_pod_log" in log[0]
+        assert log[1]["end_of_log"]
 
     def test_supports_pickling(self):
         assert KubernetesExecutor.supports_pickling
diff --git a/tests/executors/test_local_kubernetes_executor.py b/tests/executors/test_local_kubernetes_executor.py
index 09fb55ca8a..694acee0c9 100644
--- a/tests/executors/test_local_kubernetes_executor.py
+++ b/tests/executors/test_local_kubernetes_executor.py
@@ -89,18 +89,21 @@ class TestLocalKubernetesExecutor:
     def test_log_is_fetched_from_k8s_executor_only_for_k8s_queue(self):
         local_executor_mock = mock.MagicMock()
         k8s_executor_mock = mock.MagicMock()
+
+        KUBERNETES_QUEUE = conf.get("local_kubernetes_executor", "kubernetes_queue")
         LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
         local_k8s_exec = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
         simple_task_instance = mock.MagicMock()
-        simple_task_instance.queue = conf.get("local_kubernetes_executor", "kubernetes_queue")
-        local_k8s_exec.get_task_log(ti=simple_task_instance)
-        k8s_executor_mock.get_task_log.assert_called_once_with(ti=simple_task_instance)
+        simple_task_instance.queue = KUBERNETES_QUEUE
+        local_k8s_exec.get_task_log(ti=simple_task_instance, log="")
+        k8s_executor_mock.get_task_log.assert_called_once_with(ti=simple_task_instance, log=mock.ANY)
+
         k8s_executor_mock.reset_mock()
+
         simple_task_instance.queue = "test-queue"
-        messages, logs = local_k8s_exec.get_task_log(ti=simple_task_instance)
+        log = local_k8s_exec.get_task_log(ti=simple_task_instance, log="")
         k8s_executor_mock.get_task_log.assert_not_called()
-        assert logs == []
-        assert messages == []
+        assert log is None
 
     def test_send_callback(self):
         local_executor_mock = mock.MagicMock()
diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py
index f560caeed1..5fa64c9c47 100644
--- a/tests/jobs/test_triggerer_job.py
+++ b/tests/jobs/test_triggerer_job.py
@@ -19,16 +19,12 @@ from __future__ import annotations
 
 import asyncio
 import datetime
-import importlib
 import time
 from threading import Thread
-from unittest.mock import patch
 
 import pytest
 
-from airflow.config_templates import airflow_local_settings
-from airflow.jobs.triggerer_job import TriggererJob, TriggerRunner, setup_queue_listener
-from airflow.logging_config import configure_logging
+from airflow.jobs.triggerer_job import TriggererJob, TriggerRunner
 from airflow.models import DagModel, DagRun, TaskInstance, Trigger
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
@@ -36,11 +32,8 @@ from airflow.triggers.base import TriggerEvent
 from airflow.triggers.temporal import TimeDeltaTrigger
 from airflow.triggers.testing import FailureTrigger, SuccessTrigger
 from airflow.utils import timezone
-from airflow.utils.log.logging_mixin import RedirectStdHandler
-from airflow.utils.log.trigger_handler import LocalQueueHandler
 from airflow.utils.session import create_session
 from airflow.utils.state import State, TaskInstanceState
-from tests.core.test_logging_config import reset_logging
 from tests.test_utils.db import clear_db_dags, clear_db_runs
 
 
@@ -460,50 +453,3 @@ def test_invalid_trigger(session, dag_maker):
     assert task_instance.next_method == "__fail__"
     assert task_instance.next_kwargs["error"] == "Trigger failure"
     assert task_instance.next_kwargs["traceback"][-1] == "ModuleNotFoundError: No module named 'fake'\n"
-
-
-@pytest.mark.parametrize("should_wrap", (True, False))
-@patch("airflow.jobs.triggerer_job.configure_trigger_log_handler")
-def test_handler_config_respects_donot_wrap(mock_configure, should_wrap):
-    from airflow.jobs import triggerer_job
-
-    triggerer_job.DISABLE_WRAPPER = not should_wrap
-    TriggererJob()
-    if should_wrap:
-        mock_configure.assert_called()
-    else:
-        mock_configure.assert_not_called()
-
-
-@patch("airflow.jobs.triggerer_job.setup_queue_listener")
-def test_triggerer_job_always_creates_listener(mock_setup):
-    mock_setup.assert_not_called()
-    TriggererJob()
-    mock_setup.assert_called()
-
-
-def test_queue_listener():
-    """
-    When listener func called, root handlers should be moved to queue listener
-    and replaced with queuehandler.
-    """
-    reset_logging()
-    importlib.reload(airflow_local_settings)
-    configure_logging()
-
-    def non_pytest_handlers(val):
-        return [h for h in val if "pytest" not in h.__module__]
-
-    import logging
-
-    log = logging.getLogger()
-    handlers = non_pytest_handlers(log.handlers)
-    assert len(handlers) == 1
-    handler = handlers[0]
-    assert handler.__class__ == RedirectStdHandler
-    listener = setup_queue_listener()
-    assert handler not in non_pytest_handlers(log.handlers)
-    qh = log.handlers[-1]
-    assert qh.__class__ == LocalQueueHandler
-    assert qh.queue == listener.queue
-    listener.stop()
diff --git a/tests/jobs/test_triggerer_job_logging.py b/tests/jobs/test_triggerer_job_logging.py
deleted file mode 100644
index 27a641d8e2..0000000000
--- a/tests/jobs/test_triggerer_job_logging.py
+++ /dev/null
@@ -1,445 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from __future__ import annotations
-
-import importlib
-import logging
-import warnings
-
-import pytest
-
-from airflow.config_templates import airflow_local_settings
-from airflow.jobs import triggerer_job
-from airflow.logging_config import configure_logging
-from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler
-from airflow.utils.log.file_task_handler import FileTaskHandler
-from airflow.utils.log.logging_mixin import RedirectStdHandler
-from airflow.utils.log.trigger_handler import DropTriggerLogsFilter, TriggererHandlerWrapper
-from tests.test_utils.config import conf_vars
-
-
-def non_pytest_handlers(val):
-    return [h for h in val if "pytest" not in h.__module__]
-
-
-def assert_handlers(logger, *classes):
-    handlers = non_pytest_handlers(logger.handlers)
-    assert [x.__class__ for x in handlers] == list(classes or [])
-    return handlers
-
-
-def clear_logger_handlers(log):
-    for h in log.handlers[:]:
-        if "pytest" not in h.__module__:
-            log.removeHandler(h)
-
-
-@pytest.fixture(autouse=True)
-def reload_triggerer_job():
-    importlib.reload(triggerer_job)
-
-
-def test_configure_trigger_log_handler_file():
-    """
-    root logger: RedirectStdHandler
-    task: FTH
-    result: wrap
-
-    """
-    # reset logging
-    root_logger = logging.getLogger()
-    clear_logger_handlers(root_logger)
-    configure_logging()
-
-    # before config
-    assert_handlers(root_logger, RedirectStdHandler)
-
-    # default task logger
-    task_logger = logging.getLogger("airflow.task")
-    task_handlers = assert_handlers(task_logger, FileTaskHandler)
-
-    # not yet configured to use wrapper
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is False
-
-    triggerer_job.configure_trigger_log_handler()
-    # after config
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is True
-    root_handlers = assert_handlers(root_logger, RedirectStdHandler, TriggererHandlerWrapper)
-    assert root_handlers[1].base_handler == task_handlers[0]
-    # other handlers have DropTriggerLogsFilter
-    assert root_handlers[0].filters[1].__class__ == DropTriggerLogsFilter
-    # no filters on wrapper handler
-    assert root_handlers[1].filters == []
-    # wrapper handler uses handler from airflow.task
-    assert root_handlers[1].base_handler.__class__ == FileTaskHandler
-
-
-def test_configure_trigger_log_handler_s3():
-    """
-    root logger: RedirectStdHandler
-    task: S3TH
-    result: wrap
-    """
-    with conf_vars(
-        {
-            ("logging", "remote_logging"): "True",
-            ("logging", "remote_log_conn_id"): "some_aws",
-            ("logging", "remote_base_log_folder"): "s3://some-folder",
-        }
-    ):
-        importlib.reload(airflow_local_settings)
-        configure_logging()
-
-    # before config
-    root_logger = logging.getLogger()
-    assert_handlers(root_logger, RedirectStdHandler)
-    # default task logger
-    task_logger = logging.getLogger("airflow.task")
-    task_handlers = assert_handlers(task_logger, S3TaskHandler)
-    # not yet configured to use wrapper
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is False
-
-    triggerer_job.configure_trigger_log_handler()
-    # after config
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is True
-    handlers = assert_handlers(root_logger, RedirectStdHandler, TriggererHandlerWrapper)
-    assert handlers[1].base_handler == task_handlers[0]
-    # other handlers have DropTriggerLogsFilter
-    assert handlers[0].filters[1].__class__ == DropTriggerLogsFilter
-    # no filters on wrapper handler
-    assert handlers[1].filters == []
-    # wrapper handler uses handler from airflow.task
-    assert handlers[1].base_handler.__class__ == S3TaskHandler
-
-
-class OldFileTaskHandler(FileTaskHandler):
-    """Handler that hasn't been updated to support triggerer"""
-
-    def _read(self, ti, try_number, metadata=None):
-        super()._read(self, ti, try_number, metadata)
-
-
-non_file_task_handler = {
-    "version": 1,
-    "handlers": {"task": {"class": "logging.Handler"}},
-    "loggers": {"airflow.task": {"handlers": ["task"]}},
-}
-
-old_file_task_handler = {
-    "version": 1,
-    "handlers": {
-        "task": {
-            "class": "tests.jobs.test_triggerer_job_logging.OldFileTaskHandler",
-            "base_log_folder": "hello",
-        }
-    },
-    "loggers": {"airflow.task": {"handlers": ["task"]}},
-}
-
-not_supported_message = [
-    "Handler OldFileTaskHandler does not support individual trigger logging. "
-    "Please check the release notes for your provider to see if a newer version "
-    "supports individual trigger logging.",
-    "Could not find log handler suitable for individual trigger logging.",
-]
-not_found_message = ["Could not find log handler suitable for individual trigger logging."]
-
-
-@pytest.mark.parametrize(
-    "cfg, cls, msg",
-    [
-        ("old_file_task_handler", OldFileTaskHandler, not_supported_message),
-        ("non_file_task_handler", logging.Handler, not_found_message),
-    ],
-)
-def test_configure_trigger_log_handler_not_file_task_handler(cfg, cls, msg):
-    """
-    No root handler configured.
-    When non FileTaskHandler is configured, don't modify.
-    When when an incompatible subclass of FileTaskHandler is configured, don't modify.
-    """
-    # reset handlers
-    root_logger = logging.getLogger()
-    clear_logger_handlers(root_logger)
-
-    with conf_vars(
-        {
-            (
-                "logging",
-                "logging_config_class",
-            ): f"tests.jobs.test_triggerer_job_logging.{cfg}",
-        }
-    ):
-        importlib.reload(airflow_local_settings)
-        configure_logging()
-
-    # no root handlers
-    assert_handlers(root_logger)
-
-    # default task logger
-    task_logger = logging.getLogger("airflow.task")
-    assert_handlers(task_logger, cls)
-
-    # not yet configured to use wrapper
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is False
-
-    with warnings.catch_warnings(record=True) as captured:
-        triggerer_job.configure_trigger_log_handler()
-
-    assert [x.message.args[0] for x in captured] == msg
-
-    # after config
-    # doesn't use TriggererHandlerWrapper, no change in handler
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is False
-
-    # still no root handlers
-    assert_handlers(root_logger)
-
-
-fallback_task = {
-    "version": 1,
-    "handlers": {
-        "task": {
-            "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
-            "base_log_folder": "~/abc",
-            "s3_log_folder": "s3://abc",
-            "filename_template": "blah",
-        },
-    },
-    "loggers": {"airflow.task": {"handlers": ["task"]}},
-}
-
-
-def test_configure_trigger_log_handler_fallback_task():
-    """
-    root: no handler
-    task: FTH
-    result: wrap
-    """
-    with conf_vars(
-        {
-            ("logging", "logging_config_class"): "tests.jobs.test_triggerer_job_logging.fallback_task",
-        }
-    ):
-        importlib.reload(airflow_local_settings)
-        configure_logging()
-
-    # check custom config used
-    task_logger = logging.getLogger("airflow.task")
-    assert_handlers(task_logger, S3TaskHandler)
-
-    # before config
-    root_logger = logging.getLogger()
-    assert_handlers(root_logger)
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is False
-
-    triggerer_job.configure_trigger_log_handler()
-
-    # after config
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is True
-
-    handlers = assert_handlers(root_logger, TriggererHandlerWrapper)
-    assert handlers[0].base_handler == task_logger.handlers[0]
-    # no filters on wrapper handler
-    assert handlers[0].filters == []
-
-
-root_has_task_handler = {
-    "version": 1,
-    "handlers": {
-        "task": {"class": "logging.Handler"},
-        "trigger": {
-            "class": "airflow.utils.log.file_task_handler.FileTaskHandler",
-            "base_log_folder": "blah",
-        },
-    },
-    "loggers": {
-        "airflow.task": {"handlers": ["task"]},
-        "": {"handlers": ["trigger"]},
-    },
-}
-
-
-def test_configure_trigger_log_handler_root_has_task_handler():
-    """
-    root logger: single handler that supports triggerer
-    result: wrap
-    """
-    with conf_vars(
-        {
-            (
-                "logging",
-                "logging_config_class",
-            ): "tests.jobs.test_triggerer_job_logging.root_has_task_handler",
-        }
-    ):
-        configure_logging()
-
-    # check custom config used
-    task_logger = logging.getLogger("airflow.task")
-    assert_handlers(task_logger, logging.Handler)
-
-    # before config
-    root_logger = logging.getLogger()
-    assert_handlers(root_logger, FileTaskHandler)
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is False
-
-    # configure
-    triggerer_job.configure_trigger_log_handler()
-
-    # after config
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is True
-    handlers = assert_handlers(root_logger, TriggererHandlerWrapper)
-    # no filters on wrapper handler
-    assert handlers[0].filters == []
-    # wrapper handler uses handler from airflow.task
-    assert handlers[0].base_handler.__class__ == FileTaskHandler
-
-
-root_not_file_task = {
-    "version": 1,
-    "handlers": {
-        "task": {
-            "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
-            "base_log_folder": "~/abc",
-            "s3_log_folder": "s3://abc",
-            "filename_template": "blah",
-        },
-        "trigger": {"class": "logging.Handler"},
-    },
-    "loggers": {
-        "airflow.task": {"handlers": ["task"]},
-        "": {"handlers": ["trigger"]},
-    },
-}
-
-
-def test_configure_trigger_log_handler_root_not_file_task():
-    """
-    root: A handler that doesn't support trigger or inherit FileTaskHandler
-    task: Supports triggerer
-    Result:
-        * wrap and use the task logger handler
-        * other root handlers filter trigger logging
-    """
-    with conf_vars(
-        {
-            (
-                "logging",
-                "logging_config_class",
-            ): "tests.jobs.test_triggerer_job_logging.root_not_file_task",
-        }
-    ):
-        configure_logging()
-
-    # check custom config used
-    task_logger = logging.getLogger("airflow.task")
-    assert_handlers(task_logger, S3TaskHandler)
-
-    # before config
-    root_logger = logging.getLogger()
-    assert_handlers(root_logger, logging.Handler)
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is False
-
-    # configure
-    with warnings.catch_warnings(record=True) as captured:
-        triggerer_job.configure_trigger_log_handler()
-    assert captured == []
-
-    # after config
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is True
-    handlers = assert_handlers(root_logger, logging.Handler, TriggererHandlerWrapper)
-    # other handlers have DropTriggerLogsFilter
-    assert handlers[0].filters[0].__class__ == DropTriggerLogsFilter
-    # no filters on wrapper handler
-    assert handlers[1].filters == []
-    # wrapper handler uses handler from airflow.task
-    assert handlers[1].base_handler.__class__ == S3TaskHandler
-
-
-root_logger_old_file_task = {
-    "version": 1,
-    "handlers": {
-        "task": {
-            "class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
-            "base_log_folder": "~/abc",
-            "s3_log_folder": "s3://abc",
-            "filename_template": "blah",
-        },
-        "trigger": {
-            "class": "tests.jobs.test_triggerer_job_logging.OldFileTaskHandler",
-            "base_log_folder": "abc",
-        },
-    },
-    "loggers": {
-        "airflow.task": {"handlers": ["task"]},
-        "": {"handlers": ["trigger"]},
-    },
-}
-
-
-def test_configure_trigger_log_handler_root_old_file_task():
-    """
-    Root logger handler: An older subclass of FileTaskHandler that doesn't support triggerer
-    Task logger handler: Supports triggerer
-    Result:
-        * wrap and use the task logger handler
-        * other root handlers filter trigger logging
-    """
-
-    with conf_vars(
-        {
-            (
-                "logging",
-                "logging_config_class",
-            ): "tests.jobs.test_triggerer_job_logging.root_logger_old_file_task",
-        }
-    ):
-
-        configure_logging()
-
-    # check custom config used
-    assert_handlers(logging.getLogger("airflow.task"), S3TaskHandler)
-
-    # before config
-    root_logger = logging.getLogger()
-    assert_handlers(root_logger, OldFileTaskHandler)
-
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is False
-
-    with warnings.catch_warnings(record=True) as captured:
-        triggerer_job.configure_trigger_log_handler()
-
-    # since a root logger is explicitly configured with an old FileTaskHandler which doesn't
-    # work properly with individual trigger logging, warn
-    # todo: we should probably just remove the handler in this case it's basically misconfiguration
-    assert [x.message.args[0] for x in captured] == [
-        "Handler OldFileTaskHandler does not support individual trigger logging. "
-        "Please check the release notes for your provider to see if a newer version "
-        "supports individual trigger logging.",
-    ]
-
-    # after config
-    assert triggerer_job.HANDLER_SUPPORTS_TRIGGERER is True
-    handlers = assert_handlers(root_logger, OldFileTaskHandler, TriggererHandlerWrapper)
-    # other handlers have DropTriggerLogsFilter
-    assert handlers[0].filters[0].__class__ == DropTriggerLogsFilter
-    # no filters on wrapper handler
-    assert handlers[1].filters == []
-    # wrapper handler uses handler from airflow.task
-    assert handlers[1].base_handler.__class__ == S3TaskHandler
diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
index 257aa14440..954eaef5ba 100644
--- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -154,6 +154,23 @@ class TestCloudwatchTaskHandler:
             [{"end_of_log": True}],
         )
 
+    def test_should_read_from_local_on_failure_to_fetch_remote_logs(self):
+        """Check that local logs are displayed on failure to fetch remote logs"""
+        self.cloudwatch_task_handler.set_context(self.ti)
+        with mock.patch.object(self.cloudwatch_task_handler, "get_cloudwatch_logs") as mock_get_logs:
+            mock_get_logs.side_effect = Exception("Failed to connect")
+            log, metadata = self.cloudwatch_task_handler._read(self.ti, self.ti.try_number)
+        expected_log = (
+            f"*** Unable to read remote logs from Cloudwatch (log_group: {self.remote_log_group}, "
+            f"log_stream: {self.remote_log_stream})\n*** Failed to connect\n\n"
+            # The value of "log_pos" is equal to the length of this next line
+            f"*** Reading local file: {self.local_log_location}/{self.remote_log_stream}\n"
+        )
+        assert log == expected_log
+        expected_log_pos = 26 + len(self.local_log_location) + len(self.remote_log_stream)
+        assert metadata == {"end_of_log": False, "log_pos": expected_log_pos}
+        mock_get_logs.assert_called_once_with(stream_name=self.remote_log_stream)
+
     def test_close_prevents_duplicate_calls(self):
         with mock.patch("watchtower.CloudWatchLogHandler.close") as mock_log_handler_close:
             with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"):
diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py b/tests/providers/amazon/aws/log/test_s3_task_handler.py
index 4fbe453274..8b01025e3f 100644
--- a/tests/providers/amazon/aws/log/test_s3_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py
@@ -18,7 +18,6 @@
 from __future__ import annotations
 
 import contextlib
-import copy
 import os
 from unittest import mock
 
@@ -32,7 +31,7 @@ from airflow.operators.empty import EmptyOperator
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler
 from airflow.utils.session import create_session
-from airflow.utils.state import State, TaskInstanceState
+from airflow.utils.state import State
 from airflow.utils.timezone import datetime
 from tests.test_utils.config import conf_vars
 
@@ -127,24 +126,20 @@ class TestS3TaskHandler:
 
     def test_read(self):
         self.conn.put_object(Bucket="bucket", Key=self.remote_log_key, Body=b"Log line\n")
-        ti = copy.copy(self.ti)
-        ti.state = TaskInstanceState.SUCCESS
-        log, metadata = self.s3_task_handler.read(ti)
-        actual = log[0][0][-1]
-        expected = "*** Found logs in s3:\n***   * s3://bucket/remote/log/location/1.log\nLog line"
-        assert actual == expected
-        assert metadata == [{"end_of_log": True, "log_pos": 8}]
+        log, metadata = self.s3_task_handler.read(self.ti)
+        assert (
+            log[0][0][-1]
+            == "*** Reading remote log from s3://bucket/remote/log/location/1.log.\nLog line\n\n"
+        )
+        assert metadata == [{"end_of_log": True}]
 
     def test_read_when_s3_log_missing(self):
-        ti = copy.copy(self.ti)
-        ti.state = TaskInstanceState.SUCCESS
-        log, metadata = self.s3_task_handler.read(ti)
+        log, metadata = self.s3_task_handler.read(self.ti)
+
         assert 1 == len(log)
         assert len(log) == len(metadata)
-        actual = log[0][0][-1]
-        expected = "*** No logs found on s3 for ti=<TaskInstance: dag_for_testing_s3_task_handler.task_for_testing_s3_log_handler test [success]>\n"  # noqa: E501
-        assert actual == expected
-        assert {"end_of_log": True, "log_pos": 0} == metadata[0]
+        assert "*** Local log file does not exist:" in log[0][0][-1]
+        assert {"end_of_log": True} == metadata[0]
 
     def test_s3_read_when_log_missing(self):
         handler = self.s3_task_handler
diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py
index 690ae9dc33..b801d1fc05 100644
--- a/tests/providers/google/cloud/log/test_gcs_task_handler.py
+++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py
@@ -16,11 +16,9 @@
 # under the License.
 from __future__ import annotations
 
-import copy
 import logging
 import tempfile
 from unittest import mock
-from unittest.mock import MagicMock
 
 import pytest
 from pytest import param
@@ -92,18 +90,15 @@ class TestGCSTaskHandler:
     @mock.patch("google.cloud.storage.Client")
     @mock.patch("google.cloud.storage.Blob")
     def test_should_read_logs_from_remote(self, mock_blob, mock_client, mock_creds):
-        mock_obj = MagicMock()
-        mock_obj.name = "remote/log/location/1.log"
-        mock_client.return_value.list_blobs.return_value = [mock_obj]
         mock_blob.from_string.return_value.download_as_bytes.return_value = b"CONTENT"
-        ti = copy.copy(self.ti)
-        ti.state = TaskInstanceState.SUCCESS
-        logs, metadata = self.gcs_task_handler._read(ti, self.ti.try_number)
+
+        logs, metadata = self.gcs_task_handler._read(self.ti, self.ti.try_number)
         mock_blob.from_string.assert_called_once_with(
             "gs://bucket/remote/log/location/1.log", mock_client.return_value
         )
-        assert logs == "*** Found remote logs:\n***   * gs://bucket/remote/log/location/1.log\nCONTENT"
-        assert {"end_of_log": True, "log_pos": 7} == metadata
+
+        assert "*** Reading remote log from gs://bucket/remote/log/location/1.log.\nCONTENT\n" == logs
+        assert {"end_of_log": True} == metadata
 
     @mock.patch(
         "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id",
@@ -111,25 +106,17 @@ class TestGCSTaskHandler:
     )
     @mock.patch("google.cloud.storage.Client")
     @mock.patch("google.cloud.storage.Blob")
-    def test_should_read_from_local_on_logs_read_error(self, mock_blob, mock_client, mock_creds):
-        mock_obj = MagicMock()
-        mock_obj.name = "remote/log/location/1.log"
-        mock_client.return_value.list_blobs.return_value = [mock_obj]
+    def test_should_read_from_local(self, mock_blob, mock_client, mock_creds):
         mock_blob.from_string.return_value.download_as_bytes.side_effect = Exception("Failed to connect")
 
         self.gcs_task_handler.set_context(self.ti)
-        ti = copy.copy(self.ti)
-        ti.state = TaskInstanceState.SUCCESS
-        log, metadata = self.gcs_task_handler._read(ti, self.ti.try_number)
-
-        assert log == (
-            "*** Found remote logs:\n"
-            "***   * gs://bucket/remote/log/location/1.log\n"
-            "*** Unable to read remote log Failed to connect\n"
-            "*** Found local files:\n"
-            f"***   * {self.local_log_location}/1.log\n"
+        log, metadata = self.gcs_task_handler._read(self.ti, self.ti.try_number)
+
+        assert (
+            log == "*** Unable to read remote log from gs://bucket/remote/log/location/1.log\n*** "
+            f"Failed to connect\n\n*** Reading local file: {self.local_log_location}/1.log\n"
         )
-        assert metadata == {"end_of_log": True, "log_pos": 0}
+        assert metadata == {"end_of_log": False, "log_pos": 31 + len(self.local_log_location)}
         mock_blob.from_string.assert_called_once_with(
             "gs://bucket/remote/log/location/1.log", mock_client.return_value
         )
@@ -240,8 +227,7 @@ class TestGCSTaskHandler:
                 mock.call.from_string().download_as_bytes(),
                 mock.call.from_string("gs://bucket/remote/log/location/1.log", mock_client.return_value),
                 mock.call.from_string().upload_from_string(
-                    "MESSAGE\nError checking for previous log; if exists, may be overwritten: Fail to download\n",  # noqa: E501
-                    content_type="text/plain",
+                    "*** Previous log discarded: Fail to download\n\nMESSAGE\n", content_type="text/plain"
                 ),
             ],
             any_order=False,
diff --git a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
index 60b6947f61..3f29ea0c86 100644
--- a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
+++ b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py
@@ -16,9 +16,6 @@
 # under the License.
 from __future__ import annotations
 
-import copy
-import tempfile
-from pathlib import Path
 from unittest import mock
 
 import pytest
@@ -56,7 +53,7 @@ class TestWasbTaskHandler:
     def setup_method(self):
         self.wasb_log_folder = "wasb://container/remote/log/location"
         self.remote_log_location = "remote/log/location/1.log"
-        self.local_log_location = str(Path(tempfile.tempdir) / "local/log/location")
+        self.local_log_location = "local/log/location"
         self.container_name = "wasb-container"
         self.wasb_task_handler = WasbTaskHandler(
             base_log_folder=self.local_log_location,
@@ -65,9 +62,6 @@ class TestWasbTaskHandler:
             delete_local_copy=True,
         )
 
-    def teardown_method(self):
-        self.wasb_task_handler.close()
-
     @conf_vars({("logging", "remote_log_conn_id"): "wasb_default"})
     @mock.patch("airflow.providers.microsoft.azure.hooks.wasb.BlobServiceClient")
     def test_hook(self, mock_service):
@@ -87,11 +81,11 @@ class TestWasbTaskHandler:
     def test_set_context_raw(self, ti):
         ti.raw = True
         self.wasb_task_handler.set_context(ti)
-        assert self.wasb_task_handler.upload_on_close is False
+        assert not self.wasb_task_handler.upload_on_close
 
     def test_set_context_not_raw(self, ti):
         self.wasb_task_handler.set_context(ti)
-        assert self.wasb_task_handler.upload_on_close is True
+        assert self.wasb_task_handler.upload_on_close
 
     @mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook")
     def test_wasb_log_exists(self, mock_hook):
@@ -103,23 +97,20 @@ class TestWasbTaskHandler:
         )
 
     @mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook")
-    def test_wasb_read(self, mock_hook_cls, ti):
-        mock_hook = mock_hook_cls.return_value
-        mock_hook.get_blobs_list.return_value = ["abc/hello.log"]
-        mock_hook.read_file.return_value = "Log line"
+    def test_wasb_read(self, mock_hook, ti):
+        mock_hook.return_value.read_file.return_value = "Log line"
         assert self.wasb_task_handler.wasb_read(self.remote_log_location) == "Log line"
-        ti = copy.copy(ti)
-        ti.state = TaskInstanceState.SUCCESS
         assert self.wasb_task_handler.read(ti) == (
             [
                 [
                     (
                         "localhost",
-                        "*** Found remote logs:\n" "***   * wasb://wasb-container/abc/hello.log\n" "Log line",
+                        "*** Reading remote log from wasb://container/remote/log/location/1.log.\n"
+                        "Log line\n",
                     )
                 ]
             ],
-            [{"end_of_log": True, "log_pos": 8}],
+            [{"end_of_log": True}],
         )
 
     @mock.patch(
diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py
index da7dbc98da..c6e0a6286d 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -74,11 +74,11 @@ class TestLogView:
         logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
         logging_config["handlers"]["task"]["base_log_folder"] = log_dir
         logging_config["handlers"]["task"]["filename_template"] = self.FILENAME_TEMPLATE
-        settings_file = os.path.join(settings_folder, "airflow_local_settings_test.py")
+        settings_file = os.path.join(settings_folder, "airflow_local_settings.py")
         with open(settings_file, "w") as handle:
             new_logging_file = f"LOGGING_CONFIG = {logging_config}"
             handle.writelines(new_logging_file)
-        with conf_vars({("logging", "logging_config_class"): "airflow_local_settings_test.LOGGING_CONFIG"}):
+        with conf_vars({("logging", "logging_config_class"): "airflow_local_settings.LOGGING_CONFIG"}):
             settings.configure_logging()
         yield
         logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
@@ -117,82 +117,79 @@ class TestLogView:
 
     def test_test_read_log_chunks_should_read_one_try(self):
         task_log_reader = TaskLogReader()
-        ti = copy.copy(self.ti)
-        ti.state = TaskInstanceState.SUCCESS
-        logs, metadatas = task_log_reader.read_log_chunks(ti=ti, try_number=1, metadata={})
-        assert logs[0] == [
+        logs, metadatas = task_log_reader.read_log_chunks(ti=self.ti, try_number=1, metadata={})
+
+        assert [
             (
                 "localhost",
-                "*** Found local files:\n"
-                f"***   * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
-                "try_number=1.",
+                f"*** Reading local file: "
+                f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
+                f"try_number=1.\n",
             )
-        ]
-        assert metadatas == {"end_of_log": True, "log_pos": 13}
+        ] == logs[0]
+        assert metadatas == {"end_of_log": True, "log_pos": 102 + len(self.log_dir)}
 
     def test_test_read_log_chunks_should_read_all_files(self):
         task_log_reader = TaskLogReader()
-        ti = copy.copy(self.ti)
-        ti.state = TaskInstanceState.SUCCESS
-        logs, metadatas = task_log_reader.read_log_chunks(ti=ti, try_number=None, metadata={})
+        logs, metadatas = task_log_reader.read_log_chunks(ti=self.ti, try_number=None, metadata={})
 
-        assert logs == [
+        assert [
             [
                 (
                     "localhost",
-                    "*** Found local files:\n"
-                    f"***   * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
-                    "try_number=1.",
+                    "*** Reading local file: "
+                    f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
+                    "try_number=1.\n",
                 )
             ],
             [
                 (
                     "localhost",
-                    "*** Found local files:\n"
-                    f"***   * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n"
-                    f"try_number=2.",
+                    f"*** Reading local file: "
+                    f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n"
+                    f"try_number=2.\n",
                 )
             ],
             [
                 (
                     "localhost",
-                    "*** Found local files:\n"
-                    f"***   * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n"
-                    f"try_number=3.",
+                    f"*** Reading local file: "
+                    f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n"
+                    f"try_number=3.\n",
                 )
             ],
-        ]
-        assert metadatas == {"end_of_log": True, "log_pos": 13}
+        ] == logs
+        assert {"end_of_log": True, "log_pos": 102 + len(self.log_dir)} == metadatas
 
     def test_test_test_read_log_stream_should_read_one_try(self):
         task_log_reader = TaskLogReader()
-        ti = copy.copy(self.ti)
-        ti.state = TaskInstanceState.SUCCESS
-        stream = task_log_reader.read_log_stream(ti=ti, try_number=1, metadata={})
-        assert list(stream) == [
-            "localhost\n*** Found local files:\n"
-            f"***   * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
+        stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={})
+
+        assert [
+            "localhost\n*** Reading local file: "
+            f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
             "try_number=1.\n"
-        ]
+            "\n"
+        ] == list(stream)
 
     def test_test_test_read_log_stream_should_read_all_logs(self):
         task_log_reader = TaskLogReader()
         self.ti.state = TaskInstanceState.SUCCESS  # Ensure mocked instance is completed to return stream
         stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={})
-        assert list(stream) == [
-            "localhost\n*** Found local files:\n"
-            f"***   * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
-            "try_number=1."
+        assert [
+            "localhost\n*** Reading local file: "
+            f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
+            "try_number=1.\n"
             "\n",
-            "localhost\n*** Found local files:\n"
-            f"***   * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n"
-            "try_number=2."
+            "localhost\n*** Reading local file: "
+            f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n"
+            "try_number=2.\n"
             "\n",
-            "localhost\n*** Found local files:\n"
-            f"***   * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n"
-            "try_number=3."
+            "localhost\n*** Reading local file: "
+            f"{self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n"
+            "try_number=3.\n"
             "\n",
-        ]
+        ] == list(stream)
 
     @mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read")
     def test_read_log_stream_should_support_multiple_chunks(self, mock_read):
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index 3f762b9a94..04cdeec8c4 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -21,30 +21,19 @@ import logging
 import logging.config
 import os
 import re
-import tempfile
-from pathlib import Path
 from unittest import mock
-from unittest.mock import patch
+from unittest.mock import mock_open, patch
 
-import pendulum
 import pytest
 from kubernetes.client import models as k8s
 
 from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
-from airflow.jobs.base_job import BaseJob
-from airflow.jobs.triggerer_job import TriggererJob
-from airflow.models import DAG, DagRun, TaskInstance, Trigger
+from airflow.models import DAG, DagRun, TaskInstance
 from airflow.operators.python import PythonOperator
-from airflow.utils.log.file_task_handler import (
-    FileTaskHandler,
-    LogType,
-    _interleave_logs,
-    _parse_timestamps_in_log_file,
-)
+from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import set_context
-from airflow.utils.net import get_hostname
 from airflow.utils.session import create_session
-from airflow.utils.state import State, TaskInstanceState
+from airflow.utils.state import State
 from airflow.utils.timezone import datetime
 from airflow.utils.types import DagRunType
 from tests.test_utils.config import conf_vars
@@ -233,50 +222,25 @@ class TestFileTaskLogHandler:
         # Remove the generated tmp log file.
         os.remove(log_filename)
 
-    @patch("airflow.utils.log.file_task_handler.FileTaskHandler._read_from_local")
-    def test__read_when_local(self, mock_read_local, create_task_instance):
-        """
-        Test if local log file exists, then values returned from _read_from_local should be incorporated
-        into returned log.
-        """
-        path = Path(
-            "dag_id=dag_for_testing_local_log_read/run_id=scheduled__2016-01-01T00:00:00+00:00/task_id=task_for_testing_local_log_read/attempt=1.log"  # noqa: E501
-        )
-        mock_read_local.return_value = (["the messages"], ["the log"])
+    def test__read_from_location(self, create_task_instance):
+        """Test if local log file exists, then log is read from it"""
         local_log_file_read = create_task_instance(
             dag_id="dag_for_testing_local_log_read",
             task_id="task_for_testing_local_log_read",
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
         )
-        fth = FileTaskHandler("")
-        actual = fth._read(ti=local_log_file_read, try_number=1)
-        mock_read_local.assert_called_with(path)
-        assert actual == ("*** the messages\nthe log", {"end_of_log": True, "log_pos": 7})
-
-    def test__read_from_local(self):
-        """Tests the behavior of method _read_from_local"""
-
-        with tempfile.TemporaryDirectory() as td:
-            file1 = Path(td, "hello1.log")
-            file2 = Path(td, "hello1.log.suffix.log")
-            file1.write_text("file1 content")
-            file2.write_text("file2 content")
-            fth = FileTaskHandler("")
-            assert fth._read_from_local(file1) == (
-                [
-                    "Found local files:",
-                    f"  * {td}/hello1.log",
-                    f"  * {td}/hello1.log.suffix.log",
-                ],
-                ["file1 content", "file2 content"],
-            )
+        with patch("os.path.exists", return_value=True):
+            opener = mock_open(read_data="dummy test log data")
+            with patch("airflow.utils.log.file_task_handler.open", opener):
+                fth = FileTaskHandler("")
+                log = fth._read(ti=local_log_file_read, try_number=1)
+                assert len(log) == 2
+                assert "dummy test log data" in log[0]
 
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesExecutor.get_task_log")
-    @pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS])
-    def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instance, state):
+    def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instance):
         """Test for k8s executor, the log is read from get_task_log method"""
-        mock_k8s_get_task_log.return_value = ([], [])
         executor_name = "KubernetesExecutor"
         ti = create_task_instance(
             dag_id="dag_for_testing_k8s_executor_log_read",
@@ -284,15 +248,12 @@ class TestFileTaskLogHandler:
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
         )
-        ti.state = state
-        ti.triggerer_job = None
+
         with conf_vars({("core", "executor"): executor_name}):
-            fth = FileTaskHandler("")
-            fth._read(ti=ti, try_number=1)
-        if state == TaskInstanceState.RUNNING:
-            mock_k8s_get_task_log.assert_called_once_with(ti)
-        else:
-            mock_k8s_get_task_log.assert_not_called()
+            with patch("os.path.exists", return_value=False):
+                fth = FileTaskHandler("")
+                fth._read(ti=ti, try_number=1)
+                mock_k8s_get_task_log.assert_called_once_with(ti=ti, log=mock.ANY)
 
     def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instance):
         """Test for executors which do not have `get_task_log` method, it fallbacks to reading
@@ -305,15 +266,19 @@ class TestFileTaskLogHandler:
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
         )
-        ti.state = TaskInstanceState.RUNNING
+
         with conf_vars({("core", "executor"): executor_name}):
-            fth = FileTaskHandler("")
+            with patch("os.path.exists", return_value=False):
+                fth = FileTaskHandler("")
+
+                def mock_log_from_worker(ti, log, log_relative_path):
+                    return (log, {"end_of_log": True})
 
-            fth._read_from_logs_server = mock.Mock()
-            fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
-            actual = fth._read(ti=ti, try_number=1)
-            fth._read_from_logs_server.assert_called_once()
-        assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": True, "log_pos": 16})
+                fth._get_task_log_from_worker = mock.Mock(side_effect=mock_log_from_worker)
+                log = fth._read(ti=ti, try_number=1)
+                fth._get_task_log_from_worker.assert_called_once()
+                assert "Local log file does not exist" in log[0]
+                assert "Failed to fetch log from executor. Falling back to fetching log from worker" in log[0]
 
     @pytest.mark.parametrize(
         "pod_override, namespace_to_call",
@@ -326,7 +291,7 @@ class TestFileTaskLogHandler:
         ],
     )
     @patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="KubernetesExecutor")
-    @patch("airflow.kubernetes.kube_client.get_kube_client")
+    @patch("airflow.executors.kubernetes_executor.get_kube_client")
     def test_read_from_k8s_under_multi_namespace_mode(
         self, mock_kube_client, pod_override, namespace_to_call
     ):
@@ -356,7 +321,7 @@ class TestFileTaskLogHandler:
         file_handler = next((h for h in logger.handlers if h.name == FILE_TASK_HANDLER), None)
         set_context(logger, ti)
         ti.run(ignore_ti_state=True)
-        ti.state = TaskInstanceState.RUNNING
+
         file_handler.read(ti, 3)
 
         # first we find pod name
@@ -389,34 +354,6 @@ class TestFileTaskLogHandler:
             _preload_content=False,
         )
 
-    def test_add_triggerer_suffix(self):
-        sample = "any/path/to/thing.txt"
-        assert FileTaskHandler.add_triggerer_suffix(sample) == sample + ".trigger"
-        assert FileTaskHandler.add_triggerer_suffix(sample, job_id=None) == sample + ".trigger"
-        assert FileTaskHandler.add_triggerer_suffix(sample, job_id=123) == sample + ".trigger.123.log"
-        assert FileTaskHandler.add_triggerer_suffix(sample, job_id="123") == sample + ".trigger.123.log"
-
-    @pytest.mark.parametrize("is_a_trigger", [True, False])
-    def test_set_context_trigger(self, create_dummy_dag, dag_maker, is_a_trigger, session):
-        create_dummy_dag(dag_id="test_fth", task_id="dummy")
-        (ti,) = dag_maker.create_dagrun(execution_date=pendulum.datetime(2023, 1, 1, tz="UTC")).task_instances
-        assert isinstance(ti, TaskInstance)
-        if is_a_trigger:
-            ti.is_trigger_log_context = True
-            job = TriggererJob()
-            t = Trigger("", {})
-            t.triggerer_job = job
-            ti.triggerer = t
-            t.task_instance = ti
-        with tempfile.TemporaryDirectory() as td:
-            h = FileTaskHandler(base_log_folder=td)
-            h.set_context(ti)
-            expected = "dag_id=test_fth/run_id=test/task_id=dummy/attempt=1.log"
-            if is_a_trigger:
-                expected += f".trigger.{job.id}.log"
-            actual = h.handler.baseFilename
-            assert actual.replace(td + "/", "") == expected
-
 
 class TestFilenameRendering:
     def test_python_formatting(self, create_log_template, create_task_instance):
@@ -463,213 +400,5 @@ class TestLogUrl:
             execution_date=DEFAULT_DATE,
         )
         log_url_ti.hostname = "hostname"
-        actual = FileTaskHandler("")._get_log_retrieval_url(log_url_ti, "DYNAMIC_PATH")
-        assert actual == ("http://hostname:8793/log/DYNAMIC_PATH", "DYNAMIC_PATH")
-
-    def test_log_retrieval_valid_trigger(self, create_task_instance):
-        ti = create_task_instance(
-            dag_id="dag_for_testing_filename_rendering",
-            task_id="task_for_testing_filename_rendering",
-            run_type=DagRunType.SCHEDULED,
-            execution_date=DEFAULT_DATE,
-        )
-        ti.hostname = "hostname"
-        trigger = Trigger("", {})
-        job = BaseJob()
-        job.id = 123
-        trigger.triggerer_job = job
-        ti.trigger = trigger
-        actual = FileTaskHandler("")._get_log_retrieval_url(ti, "DYNAMIC_PATH", log_type=LogType.TRIGGER)
-        hostname = get_hostname()
-        assert actual == (
-            f"http://{hostname}:8794/log/DYNAMIC_PATH.trigger.123.log",
-            "DYNAMIC_PATH.trigger.123.log",
-        )
-
-
-log_sample = """[2022-11-16T00:05:54.278-0800] {taskinstance.py:1257} INFO -
---------------------------------------------------------------------------------
-[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1
-[2022-11-16T00:05:54.279-0800] {taskinstance.py:1259} INFO -
---------------------------------------------------------------------------------
-[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 08:05:52.324532+00:00
-[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task
-[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task
-[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task
-[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', 'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', '33648', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']
-[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait
-[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running <TaskInstance: simple_async_timedelta.wait manual__2022-11-16T08:05:52.324532+00:00 [running]> on host daniels-mbp-2.lan
-[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - Exporting the following env vars:
-AIRFLOW_CTX_DAG_OWNER=airflow
-AIRFLOW_CTX_DAG_ID=simple_async_timedelta
-AIRFLOW_CTX_TASK_ID=wait
-AIRFLOW_CTX_EXECUTION_DATE=2022-11-16T08:05:52.324532+00:00
-AIRFLOW_CTX_TRY_NUMBER=1
-AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00
-[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554
-"""  # noqa: E501
-
-
-def test_parse_timestamps():
-    actual = []
-    for timestamp, idx, line in _parse_timestamps_in_log_file(log_sample.splitlines()):
-        actual.append(timestamp)
-    assert actual == [
-        pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.278000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.279000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.279000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.295000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.300000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.300000-08:00"),  # duplicate
-        pendulum.parse("2022-11-16T00:05:54.300000-08:00"),  # duplicate
-        pendulum.parse("2022-11-16T00:05:54.306000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.309000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.457000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.592000-08:00"),
-        pendulum.parse("2022-11-16T00:05:54.604000-08:00"),
-    ]
-
-
-def test_interleave_interleaves():
-
-    log_sample1 = "\n".join(
-        [
-            "[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1",
-        ]
-    )
-    log_sample2 = "\n".join(
-        [
-            "[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 08:05:52.324532+00:00",  # noqa: E501
-            "[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",  # noqa: E501
-            "[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",  # noqa: E501
-            "[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",  # noqa: E501
-            "[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', 'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', '33648', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']",  # noqa: E501
-            "[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait",  # noqa: E501
-        ]
-    )
-    log_sample3 = "\n".join(
-        [
-            "[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running <TaskInstance: simple_async_timedelta.wait manual__2022-11-16T08:05:52.324532+00:00 [running]> on host daniels-mbp-2.lan",  # noqa: E501
-            "[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER=airflow",  # noqa: E501
-            "AIRFLOW_CTX_DAG_ID=simple_async_timedelta",
-            "AIRFLOW_CTX_TASK_ID=wait",
-            "AIRFLOW_CTX_EXECUTION_DATE=2022-11-16T08:05:52.324532+00:00",
-            "AIRFLOW_CTX_TRY_NUMBER=1",
-            "AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00",
-            "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554",  # noqa: E501
-        ]
-    )
-    expected = "\n".join(
-        [
-            "[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1",  # noqa: E501
-            "[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 08:05:52.324532+00:00",  # noqa: E501
-            "[2022-11-16T00:05:54.300-0800] {standard_task_runner.py:55} INFO - Started process 52536 to run task",  # noqa: E501
-            "[2022-11-16T00:05:54.306-0800] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'simple_async_timedelta', 'wait', 'manual__2022-11-16T08:05:52.324532+00:00', '--job-id', '33648', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp725r305n']",  # noqa: E501
-            "[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait",  # noqa: E501
-            "[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running <TaskInstance: simple_async_timedelta.wait manual__2022-11-16T08:05:52.324532+00:00 [running]> on host daniels-mbp-2.lan",  # noqa: E501
-            "[2022-11-16T00:05:54.592-0800] {taskinstance.py:1485} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER=airflow",  # noqa: E501
-            "AIRFLOW_CTX_DAG_ID=simple_async_timedelta",
-            "AIRFLOW_CTX_TASK_ID=wait",
-            "AIRFLOW_CTX_EXECUTION_DATE=2022-11-16T08:05:52.324532+00:00",
-            "AIRFLOW_CTX_TRY_NUMBER=1",
-            "AIRFLOW_CTX_DAG_RUN_ID=manual__2022-11-16T08:05:52.324532+00:00",
-            "[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554",  # noqa: E501
-        ]
-    )
-    assert "\n".join(_interleave_logs(log_sample2, log_sample1, log_sample3)) == expected
-
-
-long_sample = """
-*** yoyoyoyo
-[2023-01-15T22:36:46.474-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
-[2023-01-15T22:36:46.482-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
-[2023-01-15T22:36:46.483-0800] {taskinstance.py:1332} INFO - Starting attempt 1 of 1
-[2023-01-15T22:36:46.516-0800] {taskinstance.py:1351} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2023-01-16 06:36:43.044492+00:00
-[2023-01-15T22:36:46.522-0800] {standard_task_runner.py:56} INFO - Started process 38807 to run task
-[2023-01-15T22:36:46.530-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '487', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmpiwyl54bn', '--no-shut-down-logging']
-[2023-01-15T22:36:46.536-0800] {standard_task_runner.py:84} INFO - Job 487: Subtask wait
-[2023-01-15T22:36:46.624-0800] {task_command.py:417} INFO - Running <TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [running]> on host daniels-mbp-2.lan
-[2023-01-15T22:36:46.918-0800] {taskinstance.py:1558} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_time_delta_sensor_async' AIRFLOW_CTX_TASK_ID='wait' AIRFLOW_CTX_EXECUTION_DATE='2023-01-16T06:36:43.044492+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-01-16T06:36:43.044492+00:00'
-[2023-01-15T22:36:46.929-0800] {taskinstance.py:1433} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646
-[2023-01-15T22:36:46.981-0800] {local_task_job.py:218} INFO - Task exited with return code 100 (task deferral)
-
-[2023-01-15T22:36:46.474-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
-[2023-01-15T22:36:46.482-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
-[2023-01-15T22:36:46.483-0800] {taskinstance.py:1332} INFO - Starting attempt 1 of 1
-[2023-01-15T22:36:46.516-0800] {taskinstance.py:1351} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2023-01-16 06:36:43.044492+00:00
-[2023-01-15T22:36:46.522-0800] {standard_task_runner.py:56} INFO - Started process 38807 to run task
-[2023-01-15T22:36:46.530-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '487', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmpiwyl54bn', '--no-shut-down-logging']
-[2023-01-15T22:36:46.536-0800] {standard_task_runner.py:84} INFO - Job 487: Subtask wait
-[2023-01-15T22:36:46.624-0800] {task_command.py:417} INFO - Running <TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [running]> on host daniels-mbp-2.lan
-[2023-01-15T22:36:46.918-0800] {taskinstance.py:1558} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='example_time_delta_sensor_async' AIRFLOW_CTX_TASK_ID='wait' AIRFLOW_CTX_EXECUTION_DATE='2023-01-16T06:36:43.044492+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-01-16T06:36:43.044492+00:00'
-[2023-01-15T22:36:46.929-0800] {taskinstance.py:1433} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646
-[2023-01-15T22:36:46.981-0800] {local_task_job.py:218} INFO - Task exited with return code 100 (task deferral)
-[2023-01-15T22:37:17.673-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
-[2023-01-15T22:37:17.681-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [queued]>
-[2023-01-15T22:37:17.682-0800] {taskinstance.py:1330} INFO - resuming after deferral
-[2023-01-15T22:37:17.693-0800] {taskinstance.py:1351} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2023-01-16 06:36:43.044492+00:00
-[2023-01-15T22:37:17.697-0800] {standard_task_runner.py:56} INFO - Started process 39090 to run task
-[2023-01-15T22:37:17.703-0800] {standard_task_runner.py:83} INFO - Running: ['airflow', 'tasks', 'run', 'example_time_delta_sensor_async', 'wait', 'manual__2023-01-16T06:36:43.044492+00:00', '--job-id', '488', '--raw', '--subdir', '/Users/dstandish/code/airflow/airflow/example_dags/example_time_delta_sensor_async.py', '--cfg-path', '/var/folders/7_/1xx0hqcs3txd7kqt0ngfdjth0000gn/T/tmp_sa9sau4', '--no-shut-down-logging']
-[2023-01-15T22:37:17.707-0800] {standard_task_runner.py:84} INFO - Job 488: Subtask wait
-[2023-01-15T22:37:17.771-0800] {task_command.py:417} INFO - Running <TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-16T06:36:43.044492+00:00 [running]> on host daniels-mbp-2.lan
-[2023-01-15T22:37:18.043-0800] {taskinstance.py:1369} INFO - Marking task as SUCCESS. dag_id=example_time_delta_sensor_async, task_id=wait, execution_date=20230116T063643, start_date=20230116T063646, end_date=20230116T063718
-[2023-01-15T22:37:18.117-0800] {local_task_job.py:220} INFO - Task exited with return code 0
-[2023-01-15T22:37:18.147-0800] {taskinstance.py:2648} INFO - 0 downstream tasks scheduled from follow-on schedule check
-[2023-01-15T22:37:18.173-0800] {:0} Level None - end_of_log
-
-*** hihihi!
-[2023-01-15T22:36:48.348-0800] {temporal.py:62} INFO - trigger starting
-[2023-01-15T22:36:48.348-0800] {temporal.py:66} INFO - 24 seconds remaining; sleeping 10 seconds
-[2023-01-15T22:36:58.349-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:36:59.349-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:00.349-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:01.350-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:02.350-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:03.351-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:04.351-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:05.353-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:06.354-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:07.355-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:08.356-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:09.357-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:10.358-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:11.359-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:12.359-0800] {temporal.py:71} INFO - sleeping 1 second...
-[2023-01-15T22:37:13.360-0800] {temporal.py:74} INFO - yielding event with payload DateTime(2023, 1, 16, 6, 37, 13, 44492, tzinfo=Timezone('UTC'))
-[2023-01-15T22:37:13.361-0800] {triggerer_job.py:540} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2023-01-16T06:37:13.044492+00:00> (ID 106) fired: TriggerEvent<DateTime(2023, 1, 16, 6, 37, 13, 44492, tzinfo=Timezone('UTC'))>
-"""  # noqa: E501
-
-# for line in _parse_timestamps_in_log_file(long_sample.splitlines()):
-#     print(line)
-# for line in _interleave_logs(long_sample):
-#     print(line)
-
-
-def test_this():
-    """
-    Notice there are two messages with timestamp `2023-01-17T12:47:11.883-0800`.
-    In this case, these should appear in correct order and be deduped in result.
-    """
-    sample_with_dupe = """[2023-01-17T12:46:55.868-0800] {temporal.py:62} INFO - trigger starting
-    [2023-01-17T12:46:55.868-0800] {temporal.py:71} INFO - sleeping 1 second...
-    [2023-01-17T12:47:09.882-0800] {temporal.py:71} INFO - sleeping 1 second...
-    [2023-01-17T12:47:10.882-0800] {temporal.py:71} INFO - sleeping 1 second...
-    [2023-01-17T12:47:11.883-0800] {temporal.py:74} INFO - yielding event with payload DateTime(2023, 1, 17, 20, 47, 11, 254388, tzinfo=Timezone('UTC'))
-    [2023-01-17T12:47:11.883-0800] {triggerer_job.py:540} INFO - Trigger <airflow.triggers.temporal.DateTimeTrigger moment=2023-01-17T20:47:11.254388+00:00> (ID 1) fired: TriggerEvent<DateTime(2023, 1, 17, 20, 47, 11, 254388, tzinfo=Timezone('UTC'))>
-    """  # noqa: E501
-
-    assert sample_with_dupe == "\n".join(_interleave_logs(sample_with_dupe, "", sample_with_dupe))
-
-
-def test_ttthis():
-    local_logs = "[2023-01-17T13:41:53.228-0800] {temporal.py:62} INFO - trigger starting\n[2023-01-17T13:41:53.228-0800] {temporal.py:71} INFO - sleeping 1 second...\n[2023-01-17T13:41:54.228-0800] {temporal.py:71} INFO - sleeping 1 second...\n[2023-01-17T13:41:55.229-0800] {temporal.py:71} INFO - sleeping 1 second...\n[2023-01-17T13:41:56.229-0800] {temporal.py:71} INFO - sleeping 1 second...\n[2023-01-17T13:41:57.229-0800] {temporal.py:71} INFO - sleeping 1 second...\n[2023-01-17T13:4 [...]
-    remote_logs = "[2023-01-17T13:41:51.446-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-17T21:41:38.150776+00:00 [queued]>\n[2023-01-17T13:41:51.454-0800] {taskinstance.py:1131} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: example_time_delta_sensor_async.wait manual__2023-01-17T21:41:38.150776+00:00 [queued]>\n[2023-01-17T13:41:51.454- [...]
-    print("\n".join(_interleave_logs(local_logs, remote_logs)))
+        url = FileTaskHandler._get_log_retrieval_url(log_url_ti, "DYNAMIC_PATH")
+        assert url == "http://hostname:8793/log/DYNAMIC_PATH"
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index cc8366dee2..c0160bb2ba 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -22,7 +22,6 @@ import json
 import re
 import unittest.mock
 import urllib.parse
-from getpass import getuser
 
 import pytest
 import time_machine
@@ -781,10 +780,10 @@ def _get_appbuilder_pk_string(model_view_cls, instance) -> str:
 
     Example usage::
 
-        from airflow.www.views import TaskInstanceModelView
-        ti = session.Query(TaskInstance).filter(...).one()
-        pk = _get_appbuilder_pk_string(TaskInstanceModelView, ti)
-        client.post("...", data={"action": "...", "rowid": pk})
+        >>> from airflow.www.views import TaskInstanceModelView
+        >>> ti = session.Query(TaskInstance).filter(...).one()
+        >>> pk = _get_appbuilder_pk_string(TaskInstanceModelView, ti)
+        >>> client.post("...", data={"action": "...", "rowid": pk})
     """
     pk_value = model_view_cls.datamodel.get_pk_value(instance)
     return model_view_cls._serialize_pk_if_composite(model_view_cls, pk_value)
@@ -1060,7 +1059,7 @@ def test_task_instances(admin_client):
             "trigger_id": None,
             "trigger_timeout": None,
             "try_number": 1,
-            "unixname": getuser(),
+            "unixname": "root",
             "updated_at": DEFAULT_DATE.isoformat(),
         },
         "run_after_loop": {
@@ -1090,7 +1089,7 @@ def test_task_instances(admin_client):
             "trigger_id": None,
             "trigger_timeout": None,
             "try_number": 1,
-            "unixname": getuser(),
+            "unixname": "root",
             "updated_at": DEFAULT_DATE.isoformat(),
         },
         "run_this_last": {
@@ -1120,7 +1119,7 @@ def test_task_instances(admin_client):
             "trigger_id": None,
             "trigger_timeout": None,
             "try_number": 1,
-            "unixname": getuser(),
+            "unixname": "root",
             "updated_at": DEFAULT_DATE.isoformat(),
         },
         "runme_0": {
@@ -1150,7 +1149,7 @@ def test_task_instances(admin_client):
             "trigger_id": None,
             "trigger_timeout": None,
             "try_number": 1,
-            "unixname": getuser(),
+            "unixname": "root",
             "updated_at": DEFAULT_DATE.isoformat(),
         },
         "runme_1": {
@@ -1180,7 +1179,7 @@ def test_task_instances(admin_client):
             "trigger_id": None,
             "trigger_timeout": None,
             "try_number": 1,
-            "unixname": getuser(),
+            "unixname": "root",
             "updated_at": DEFAULT_DATE.isoformat(),
         },
         "runme_2": {
@@ -1210,7 +1209,7 @@ def test_task_instances(admin_client):
             "trigger_id": None,
             "trigger_timeout": None,
             "try_number": 1,
-            "unixname": getuser(),
+            "unixname": "root",
             "updated_at": DEFAULT_DATE.isoformat(),
         },
         "this_will_skip": {
@@ -1240,7 +1239,7 @@ def test_task_instances(admin_client):
             "trigger_id": None,
             "trigger_timeout": None,
             "try_number": 1,
-            "unixname": getuser(),
+            "unixname": "root",
             "updated_at": DEFAULT_DATE.isoformat(),
         },
     }