You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/08/12 09:49:24 UTC

[airflow] branch main updated: D401 Support - airflow/example_dags thru airflow/listeners (#33336)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d0c94d6bee D401 Support - airflow/example_dags thru airflow/listeners (#33336)
d0c94d6bee is described below

commit d0c94d6bee2a9494e44f29c2c242c956877e9619
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Sat Aug 12 02:49:17 2023 -0700

    D401 Support - airflow/example_dags thru airflow/listeners (#33336)
---
 airflow/executors/base_executor.py                 | 18 +++++++-------
 airflow/executors/executor_loader.py               | 13 +++++-----
 airflow/executors/local_executor.py                | 22 +++++++++-------
 airflow/hooks/base.py                              |  6 ++---
 airflow/hooks/filesystem.py                        |  4 +--
 airflow/hooks/package_index.py                     |  8 +++---
 airflow/hooks/subprocess.py                        |  2 +-
 airflow/jobs/backfill_job_runner.py                |  2 +-
 airflow/jobs/base_job_runner.py                    | 12 ++++++---
 airflow/jobs/job.py                                | 12 ++++-----
 airflow/jobs/local_task_job_runner.py              |  4 +--
 airflow/jobs/scheduler_job_runner.py               | 10 ++++----
 airflow/jobs/triggerer_job_runner.py               | 29 +++++++++++-----------
 .../pre_7_4_0_compatibility/kube_client.py         |  4 +--
 .../pre_7_4_0_compatibility/pod_generator.py       |  8 +++---
 .../pod_generator_deprecated.py                    |  6 ++---
 .../kubernetes/pre_7_4_0_compatibility/secret.py   |  6 ++---
 airflow/lineage/__init__.py                        |  4 +--
 airflow/lineage/backend.py                         |  2 +-
 airflow/listeners/spec/dagrun.py                   |  6 ++---
 airflow/listeners/spec/lifecycle.py                |  4 +--
 airflow/listeners/spec/taskinstance.py             |  6 ++---
 22 files changed, 99 insertions(+), 89 deletions(-)

diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 97de6be1fa..81c441b521 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -188,7 +188,7 @@ class BaseExecutor(LoggingMixin):
 
     def has_task(self, task_instance: TaskInstance) -> bool:
         """
-        Checks if a task is either queued or running in this executor.
+        Check if a task is either queued or running in this executor.
 
         :param task_instance: TaskInstance
         :return: True if the task is known to this executor
@@ -250,7 +250,7 @@ class BaseExecutor(LoggingMixin):
 
     def trigger_tasks(self, open_slots: int) -> None:
         """
-        Initiates async execution of the queued tasks, up to the number of available slots.
+        Initiate async execution of the queued tasks, up to the number of available slots.
 
         :param open_slots: Number of open slots
         """
@@ -298,7 +298,7 @@ class BaseExecutor(LoggingMixin):
 
     def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None) -> None:
         """
-        Changes state of the task.
+        Change state of the task.
 
         :param info: Executor information for the task instance
         :param key: Unique key for the task instance
@@ -358,7 +358,7 @@ class BaseExecutor(LoggingMixin):
         executor_config: Any | None = None,
     ) -> None:  # pragma: no cover
         """
-        This method will execute the command asynchronously.
+        Execute the command asynchronously.
 
         :param key: Unique key for the task instance
         :param command: Command to run
@@ -369,7 +369,7 @@ class BaseExecutor(LoggingMixin):
 
     def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
         """
-        This method can be implemented by any child class to return the task logs.
+        Return the task logs.
 
         :param ti: A TaskInstance object
         :param try_number: current try_number to read log from
@@ -382,7 +382,7 @@ class BaseExecutor(LoggingMixin):
         raise NotImplementedError()
 
     def terminate(self):
-        """This method is called when the daemon receives a SIGTERM."""
+        """Get called when the daemon receives a SIGTERM."""
         raise NotImplementedError()
 
     def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:  # pragma: no cover
@@ -458,7 +458,7 @@ class BaseExecutor(LoggingMixin):
         return None, None
 
     def debug_dump(self):
-        """Called in response to SIGUSR2 by the scheduler."""
+        """Get called in response to SIGUSR2 by the scheduler."""
         self.log.info(
             "executor.queued (%d)\n\t%s",
             len(self.queued_tasks),
@@ -472,7 +472,7 @@ class BaseExecutor(LoggingMixin):
         )
 
     def send_callback(self, request: CallbackRequest) -> None:
-        """Sends callback for execution.
+        """Send callback for execution.
 
         Provides a default implementation which sends the callback to the `callback_sink` object.
 
@@ -493,7 +493,7 @@ class BaseExecutor(LoggingMixin):
 
     @classmethod
     def _get_parser(cls) -> argparse.ArgumentParser:
-        """This method is used by Sphinx argparse to generate documentation.
+        """Generate documentation; used by Sphinx argparse.
 
         :meta private:
         """
diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py
index 4a4bda8583..73c1149515 100644
--- a/airflow/executors/executor_loader.py
+++ b/airflow/executors/executor_loader.py
@@ -72,7 +72,7 @@ class ExecutorLoader:
 
     @classmethod
     def get_default_executor_name(cls) -> str:
-        """Returns the default executor name from Airflow configuration.
+        """Return the default executor name from Airflow configuration.
 
         :return: executor name from Airflow configuration
         """
@@ -82,7 +82,7 @@ class ExecutorLoader:
 
     @classmethod
     def get_default_executor(cls) -> BaseExecutor:
-        """Creates a new instance of the configured executor if none exists and returns it."""
+        """Create a new instance of the configured executor if none exists and returns it."""
         if cls._default_executor is not None:
             return cls._default_executor
 
@@ -91,7 +91,7 @@ class ExecutorLoader:
     @classmethod
     def load_executor(cls, executor_name: str) -> BaseExecutor:
         """
-        Loads the executor.
+        Load the executor.
 
         This supports the following formats:
         * by executor name for core executor
@@ -123,7 +123,7 @@ class ExecutorLoader:
         cls, executor_name: str, validate: bool = True
     ) -> tuple[type[BaseExecutor], ConnectorSource]:
         """
-        Imports the executor class.
+        Import the executor class.
 
         Supports the same formats as ExecutorLoader.load_executor.
 
@@ -159,7 +159,7 @@ class ExecutorLoader:
     @classmethod
     def import_default_executor_cls(cls, validate: bool = True) -> tuple[type[BaseExecutor], ConnectorSource]:
         """
-        Imports the default executor class.
+        Import the default executor class.
 
         :param validate: Whether or not to validate the executor before returning
 
@@ -172,7 +172,8 @@ class ExecutorLoader:
     @classmethod
     @functools.lru_cache(maxsize=None)
     def validate_database_executor_compatibility(cls, executor: type[BaseExecutor]) -> None:
-        """Validate database and executor compatibility.
+        """
+        Validate database and executor compatibility.
 
         Most of the databases work universally, but SQLite can only work with
         single-threaded executors (e.g. Sequential).
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 7f83f8c7a2..cf88ca13b2 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -75,7 +75,7 @@ class LocalWorkerBase(Process, LoggingMixin):
 
     def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
         """
-        Executes command received and stores result state in queue.
+        Execute command received and stores result state in queue.
 
         :param key: the key to identify the task instance
         :param command: the command to execute
@@ -141,7 +141,7 @@ class LocalWorkerBase(Process, LoggingMixin):
 
     @abstractmethod
     def do_work(self):
-        """Called in the subprocess and should then execute tasks."""
+        """Execute tasks; called in the subprocess."""
         raise NotImplementedError()
 
 
@@ -236,7 +236,7 @@ class LocalExecutor(BaseExecutor):
             self.executor: LocalExecutor = executor
 
         def start(self) -> None:
-            """Starts the executor."""
+            """Start the executor."""
             self.executor.workers_used = 0
             self.executor.workers_active = 0
 
@@ -248,7 +248,7 @@ class LocalExecutor(BaseExecutor):
             executor_config: Any | None = None,
         ) -> None:
             """
-            Executes task asynchronously.
+            Execute task asynchronously.
 
             :param key: the key to identify the task instance
             :param command: the command to execute
@@ -291,7 +291,7 @@ class LocalExecutor(BaseExecutor):
             self.queue: Queue[ExecutorWorkType] | None = None
 
         def start(self) -> None:
-            """Starts limited parallelism implementation."""
+            """Start limited parallelism implementation."""
             if TYPE_CHECKING:
                 assert self.executor.manager
                 assert self.executor.result_queue
@@ -315,7 +315,7 @@ class LocalExecutor(BaseExecutor):
             executor_config: Any | None = None,
         ) -> None:
             """
-            Executes task asynchronously.
+            Execute task asynchronously.
 
             :param key: the key to identify the task instance
             :param command: the command to execute
@@ -340,7 +340,11 @@ class LocalExecutor(BaseExecutor):
                     break
 
         def end(self):
-            """Ends the executor. Sends the poison pill to all workers."""
+            """
+            End the executor.
+
+            Sends the poison pill to all workers.
+            """
             for _ in self.executor.workers:
                 self.queue.put((None, None))
 
@@ -349,7 +353,7 @@ class LocalExecutor(BaseExecutor):
             self.executor.sync()
 
     def start(self) -> None:
-        """Starts the executor."""
+        """Start the executor."""
         old_proctitle = getproctitle()
         setproctitle("airflow executor -- LocalExecutor")
         self.manager = Manager()
@@ -389,7 +393,7 @@ class LocalExecutor(BaseExecutor):
         self.impl.sync()
 
     def end(self) -> None:
-        """Ends the executor."""
+        """End the executor."""
         if TYPE_CHECKING:
             assert self.impl
             assert self.manager
diff --git a/airflow/hooks/base.py b/airflow/hooks/base.py
index c1c758a756..0974813c5a 100644
--- a/airflow/hooks/base.py
+++ b/airflow/hooks/base.py
@@ -76,7 +76,7 @@ class BaseHook(LoggingMixin):
     @classmethod
     def get_hook(cls, conn_id: str) -> BaseHook:
         """
-        Returns default hook for this connection id.
+        Return default hook for this connection id.
 
         :param conn_id: connection id
         :return: default hook for this connection
@@ -85,7 +85,7 @@ class BaseHook(LoggingMixin):
         return connection.get_hook()
 
     def get_conn(self) -> Any:
-        """Returns connection for the hook."""
+        """Return connection for the hook."""
         raise NotImplementedError()
 
     @classmethod
@@ -144,7 +144,7 @@ class DiscoverableHook(Protocol):
     @staticmethod
     def get_connection_form_widgets() -> dict[str, Any]:
         """
-        Returns dictionary of widgets to be added for the hook to handle extra values.
+        Return dictionary of widgets to be added for the hook to handle extra values.
 
         If you have class hierarchy, usually the widgets needed by your class are already
         added by the base class, so there is no need to implement this method. It might
diff --git a/airflow/hooks/filesystem.py b/airflow/hooks/filesystem.py
index d436e0050c..a47fba7225 100644
--- a/airflow/hooks/filesystem.py
+++ b/airflow/hooks/filesystem.py
@@ -43,7 +43,7 @@ class FSHook(BaseHook):
 
     @staticmethod
     def get_connection_form_widgets() -> dict[str, Any]:
-        """Returns connection widgets to add to connection form."""
+        """Return connection widgets to add to connection form."""
         from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
         from flask_babel import lazy_gettext
         from wtforms import StringField
@@ -52,7 +52,7 @@ class FSHook(BaseHook):
 
     @staticmethod
     def get_ui_field_behaviour() -> dict[str, Any]:
-        """Returns custom field behaviour."""
+        """Return custom field behaviour."""
         return {
             "hidden_fields": ["host", "schema", "port", "login", "password", "extra"],
             "relabeling": {},
diff --git a/airflow/hooks/package_index.py b/airflow/hooks/package_index.py
index 5c940506a1..87e830f68a 100644
--- a/airflow/hooks/package_index.py
+++ b/airflow/hooks/package_index.py
@@ -40,7 +40,7 @@ class PackageIndexHook(BaseHook):
 
     @staticmethod
     def get_ui_field_behaviour() -> dict[str, Any]:
-        """Returns custom field behaviour."""
+        """Return custom field behaviour."""
         return {
             "hidden_fields": ["schema", "port", "extra"],
             "relabeling": {"host": "Package Index URL"},
@@ -53,7 +53,7 @@ class PackageIndexHook(BaseHook):
 
     @staticmethod
     def _get_basic_auth_conn_url(index_url: str, user: str | None, password: str | None) -> str:
-        """Returns a connection URL with basic auth credentials based on connection config."""
+        """Return a connection URL with basic auth credentials based on connection config."""
         url = urlparse(index_url)
         host = url.netloc.split("@")[-1]
         if user:
@@ -64,11 +64,11 @@ class PackageIndexHook(BaseHook):
         return url._replace(netloc=host).geturl()
 
     def get_conn(self) -> Any:
-        """Returns connection for the hook."""
+        """Return connection for the hook."""
         return self.get_connection_url()
 
     def get_connection_url(self) -> Any:
-        """Returns a connection URL with embedded credentials."""
+        """Return a connection URL with embedded credentials."""
         conn = self.get_connection(self.pi_conn_id)
         index_url = conn.host
         if not index_url:
diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py
index af901789e4..051b4cf662 100644
--- a/airflow/hooks/subprocess.py
+++ b/airflow/hooks/subprocess.py
@@ -100,7 +100,7 @@ class SubprocessHook(BaseHook):
         return SubprocessResult(exit_code=return_code, output=line)
 
     def send_sigterm(self):
-        """Sends SIGTERM signal to ``self.sub_process`` if one exists."""
+        """Send SIGTERM signal to ``self.sub_process`` if one exists."""
         self.log.info("Sending SIGTERM signal to process group")
         if self.sub_process and hasattr(self.sub_process, "pid"):
             os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)
diff --git a/airflow/jobs/backfill_job_runner.py b/airflow/jobs/backfill_job_runner.py
index 3d2c20c612..2508ba69ab 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -170,7 +170,7 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
 
     def _update_counters(self, ti_status: _DagRunTaskStatus, session: Session) -> None:
         """
-        Updates the counters per state of the tasks that were running.
+        Update the counters per state of the tasks that were running.
 
         Can re-add to tasks to run when required.
 
diff --git a/airflow/jobs/base_job_runner.py b/airflow/jobs/base_job_runner.py
index fd3060db81..611579b239 100644
--- a/airflow/jobs/base_job_runner.py
+++ b/airflow/jobs/base_job_runner.py
@@ -46,7 +46,9 @@ class BaseJobRunner(Generic[J]):
 
     def _execute(self) -> int | None:
         """
-        Executes the logic connected to the runner. This method should be overridden by subclasses.
+        Execute the logic connected to the runner.
+
+        This method should be overridden by subclasses.
 
         :meta private:
         :return: return code if available, otherwise None
@@ -55,12 +57,16 @@ class BaseJobRunner(Generic[J]):
 
     @provide_session
     def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
-        """Callback that is called during heartbeat. This method can be overwritten by the runners."""
+        """
+        Execute callback during heartbeat.
+
+        This method can be overwritten by the runners.
+        """
 
     @classmethod
     @provide_session
     def most_recent_job(cls, session: Session = NEW_SESSION) -> Job | None:
-        """Returns the most recent job of this type, if any, based on last heartbeat received."""
+        """Return the most recent job of this type, if any, based on last heartbeat received."""
         from airflow.jobs.job import most_recent_job
 
         return most_recent_job(cls.job_type, session=session)
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 264eed15aa..f20808868d 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -147,7 +147,7 @@ class Job(Base, LoggingMixin):
 
     @provide_session
     def kill(self, session: Session = NEW_SESSION) -> NoReturn:
-        """Handles on_kill callback and updates state in database."""
+        """Handle on_kill callback and updates state in database."""
         job = session.scalar(select(Job).where(Job.id == self.id).limit(1))
         job.end_date = timezone.utcnow()
         try:
@@ -222,7 +222,7 @@ class Job(Base, LoggingMixin):
 
     @provide_session
     def prepare_for_execution(self, session: Session = NEW_SESSION):
-        """Prepares the job for execution."""
+        """Prepare the job for execution."""
         Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
         self.state = JobState.RUNNING
         self.start_date = timezone.utcnow()
@@ -240,7 +240,7 @@ class Job(Base, LoggingMixin):
 
     @provide_session
     def most_recent_job(self, session: Session = NEW_SESSION) -> Job | None:
-        """Returns the most recent job of this type, if any, based on last heartbeat received."""
+        """Return the most recent job of this type, if any, based on last heartbeat received."""
         return most_recent_job(self.job_type, session=session)
 
 
@@ -272,7 +272,7 @@ def run_job(
     job: Job | JobPydantic, execute_callable: Callable[[], int | None], session: Session = NEW_SESSION
 ) -> int | None:
     """
-    Runs the job.
+    Run the job.
 
     The Job is always an ORM object and setting the state is happening within the
     same DB session and the session is kept open throughout the whole execution.
@@ -293,7 +293,7 @@ def run_job(
 
 def execute_job(job: Job | JobPydantic, execute_callable: Callable[[], int | None]) -> int | None:
     """
-    Executes the job.
+    Execute the job.
 
     Job execution requires no session as generally executing session does not require an
     active database connection. The session might be temporary acquired and used if the job
@@ -331,7 +331,7 @@ def perform_heartbeat(
     job: Job | JobPydantic, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool
 ) -> None:
     """
-    Performs heartbeat for the Job passed to it,optionally checking if it is necessary.
+    Perform heartbeat for the Job passed to it,optionally checking if it is necessary.
 
     :param job: job to perform heartbeat for
     :param heartbeat_callback: callback to run by the heartbeat
diff --git a/airflow/jobs/local_task_job_runner.py b/airflow/jobs/local_task_job_runner.py
index 6184a3e7fc..c142504639 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -111,13 +111,13 @@ class LocalTaskJobRunner(BaseJobRunner["Job | JobPydantic"], LoggingMixin):
         self.task_runner = get_task_runner(self)
 
         def signal_handler(signum, frame):
-            """Setting kill signal handler."""
+            """Set kill signal handler."""
             self.log.error("Received SIGTERM. Terminating subprocesses")
             self.task_runner.terminate()
             self.handle_task_exit(128 + signum)
 
         def segfault_signal_handler(signum, frame):
-            """Setting sigmentation violation signal handler."""
+            """Set sigmentation violation signal handler."""
             self.log.critical(SIGSEGV_MESSAGE)
             self.task_runner.terminate()
             self.handle_task_exit(128 + signum)
diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py
index 0ed7aae5ce..ffcaf4f1a3 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -240,7 +240,7 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin):
         signal.signal(signal.SIGUSR2, self._debug_dump)
 
     def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
-        """Helper method to clean up processor_agent to avoid leaving orphan processes."""
+        """Clean up processor_agent to avoid leaving orphan processes."""
         if not _is_parent_process():
             # Only the parent process should perform the cleanup.
             return
@@ -905,7 +905,7 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin):
 
     def _run_scheduler_loop(self) -> None:
         """
-        The actual scheduler loop.
+        Harvest DAG parsing results, queue tasks, and perform executor heartbeat; the actual scheduler loop.
 
         The main steps in the loop are:
             #. Harvest DAG parsing results through DagFileProcessorAgent
@@ -1021,7 +1021,7 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin):
 
     def _do_scheduling(self, session: Session) -> int:
         """
-        This function is where the main scheduling decisions take places.
+        Make the main scheduling decisions.
 
         It:
         - Creates any necessary DAG runs by examining the next_dagrun_create_after column of DagModel
@@ -1378,7 +1378,7 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin):
         dag_runs: Iterable[DagRun],
         session: Session,
     ) -> list[tuple[DagRun, DagCallbackRequest | None]]:
-        """Makes scheduling decisions for all `dag_runs`."""
+        """Make scheduling decisions for all `dag_runs`."""
         callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
         guard.commit()
         return callback_tuples
@@ -1504,7 +1504,7 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin):
             self.log.debug("callback is empty")
 
     def _send_sla_callbacks_to_processor(self, dag: DAG) -> None:
-        """Sends SLA Callbacks to DagFileProcessor if tasks have SLAs set and check_slas=True."""
+        """Send SLA Callbacks to DagFileProcessor if tasks have SLAs set and check_slas=True."""
         if not settings.CHECK_SLAS:
             return
 
diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py
index 77fe95ed7f..81ac8c178f 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -294,14 +294,18 @@ class TriggererJobRunner(BaseJobRunner["Job | JobPydantic"], LoggingMixin):
     @provide_session
     def is_needed(cls, session) -> bool:
         """
-        Tests if the triggerer job needs to be run (i.e., if there are triggers in the trigger table).
+        Test if the triggerer job needs to be run (i.e., if there are triggers in the trigger table).
 
         This is used for the warning boxes in the UI.
         """
         return session.query(func.count(Trigger.id)).scalar() > 0
 
     def on_kill(self):
-        """Called when there is an external kill command (via the heartbeat mechanism, for example)."""
+        """
+        Stop the trigger runner.
+
+        Called when there is an external kill command (via the heartbeat mechanism, for example).
+        """
         self.trigger_runner.stop = True
 
     def _kill_listener(self):
@@ -311,7 +315,7 @@ class TriggererJobRunner(BaseJobRunner["Job | JobPydantic"], LoggingMixin):
             self.listener.stop()
 
     def _exit_gracefully(self, signum, frame) -> None:
-        """Helper method to clean up processor_agent to avoid leaving orphan processes."""
+        """Clean up processor_agent to avoid leaving orphan processes."""
         # The first time, try to exit nicely
         if not self.trigger_runner.stop:
             self.log.info("Exiting gracefully upon receiving signal %s", signum)
@@ -345,11 +349,7 @@ class TriggererJobRunner(BaseJobRunner["Job | JobPydantic"], LoggingMixin):
         return None
 
     def _run_trigger_loop(self) -> None:
-        """
-        The main-thread trigger loop.
-
-        This runs synchronously and handles all database reads/writes.
-        """
+        """Run synchronously and handle all database reads/writes; the main-thread trigger loop."""
         while not self.trigger_runner.stop:
             if not self.trigger_runner.is_alive():
                 self.log.error("Trigger runner thread has died! Exiting.")
@@ -386,7 +386,7 @@ class TriggererJobRunner(BaseJobRunner["Job | JobPydantic"], LoggingMixin):
 
     def handle_failed_triggers(self):
         """
-        Handles "failed" triggers. - ones that errored or exited before they sent an event.
+        Handle "failed" triggers. - ones that errored or exited before they sent an event.
 
         Task Instances that depend on them need failing.
         """
@@ -454,15 +454,14 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         self.job_id = None
 
     def run(self):
-        """Sync entrypoint - just runs arun in an async loop."""
+        """Sync entrypoint - just run a run in an async loop."""
         asyncio.run(self.arun())
 
     async def arun(self):
         """
-        Main (asynchronous) logic loop.
+        Run trigger addition/deletion/cleanup; main (asynchronous) logic loop.
 
-        The loop in here runs trigger addition/deletion/cleanup. Actual
-        triggers run in their own separate coroutines.
+        Actual triggers run in their own separate coroutines.
         """
         watchdog = asyncio.create_task(self.block_watchdog())
         last_status = time.time()
@@ -639,7 +638,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
 
     def update_triggers(self, requested_trigger_ids: set[int]):
         """
-        Called from the main thread to request that we update what triggers we're running.
+        Request that we update what triggers we're running.
 
         Works out the differences - ones to add, and ones to remove - then
         adds them to the deques so the subthread can actually mutate the running
@@ -706,7 +705,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
 
     def get_trigger_by_classpath(self, classpath: str) -> type[BaseTrigger]:
         """
-        Gets a trigger class by its classpath ("path.to.module.classname").
+        Get a trigger class by its classpath ("path.to.module.classname").
 
         Uses a cache dictionary to speed up lookups after the first time.
         """
diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py b/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py
index d2e791dbfd..393a3ce94f 100644
--- a/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py
+++ b/airflow/kubernetes/pre_7_4_0_compatibility/kube_client.py
@@ -52,7 +52,7 @@ except ImportError as e:
 
 def _enable_tcp_keepalive() -> None:
     """
-    This function enables TCP keepalive mechanism.
+    Enable TCP keepalive mechanism.
 
     This prevents urllib3 connection to hang indefinitely when idle connection
     is time-outed on services like cloud load balancers or firewalls.
@@ -95,7 +95,7 @@ def get_kube_client(
     config_file: str | None = None,
 ) -> client.CoreV1Api:
     """
-    Retrieves Kubernetes client.
+    Retrieve Kubernetes client.
 
     :param in_cluster: whether we are in cluster
     :param cluster_context: context of the cluster
diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py
index aaacc8ce45..a4cfda193c 100644
--- a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py
+++ b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py
@@ -164,7 +164,7 @@ class PodGenerator:
         self.extract_xcom = extract_xcom
 
     def gen_pod(self) -> k8s.V1Pod:
-        """Generates pod."""
+        """Generate pod."""
         warnings.warn("This function is deprecated. ", RemovedInAirflow3Warning)
         result = self.ud_pod
 
@@ -177,7 +177,7 @@ class PodGenerator:
 
     @staticmethod
     def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
-        """Adds sidecar."""
+        """Add sidecar."""
         warnings.warn(
             "This function is deprecated. "
             "Please use airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar instead"
@@ -193,7 +193,7 @@ class PodGenerator:
 
     @staticmethod
     def from_obj(obj) -> dict | k8s.V1Pod | None:
-        """Converts to pod from obj."""
+        """Convert to pod from obj."""
         if obj is None:
             return None
 
@@ -227,7 +227,7 @@ class PodGenerator:
 
     @staticmethod
     def from_legacy_obj(obj) -> k8s.V1Pod | None:
-        """Converts to pod from obj."""
+        """Convert to pod from obj."""
         if obj is None:
             return None
 
diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py
index 8876556a8d..df1b78c025 100644
--- a/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py
+++ b/airflow/kubernetes/pre_7_4_0_compatibility/pod_generator_deprecated.py
@@ -216,7 +216,7 @@ class PodGenerator:
         self.extract_xcom = extract_xcom
 
     def gen_pod(self) -> k8s.V1Pod:
-        """Generates pod."""
+        """Generate pod."""
         result = None
 
         if result is None:
@@ -234,7 +234,7 @@ class PodGenerator:
 
     @staticmethod
     def add_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
-        """Adds sidecar."""
+        """Add sidecar."""
         pod_cp = copy.deepcopy(pod)
         pod_cp.spec.volumes = pod.spec.volumes or []
         pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
@@ -246,7 +246,7 @@ class PodGenerator:
 
     @staticmethod
     def from_obj(obj) -> k8s.V1Pod | None:
-        """Converts to pod from obj."""
+        """Convert to pod from obj."""
         if obj is None:
             return None
 
diff --git a/airflow/kubernetes/pre_7_4_0_compatibility/secret.py b/airflow/kubernetes/pre_7_4_0_compatibility/secret.py
index 14295f5c7a..b02bbb3dd7 100644
--- a/airflow/kubernetes/pre_7_4_0_compatibility/secret.py
+++ b/airflow/kubernetes/pre_7_4_0_compatibility/secret.py
@@ -65,7 +65,7 @@ class Secret(K8SModel):
         self.key = key
 
     def to_env_secret(self) -> k8s.V1EnvVar:
-        """Stores es environment secret."""
+        """Store es environment secret."""
         return k8s.V1EnvVar(
             name=self.deploy_target,
             value_from=k8s.V1EnvVarSource(
@@ -74,11 +74,11 @@ class Secret(K8SModel):
         )
 
     def to_env_from_secret(self) -> k8s.V1EnvFromSource:
-        """Reads from environment to secret."""
+        """Read from environment to secret."""
         return k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=self.secret))
 
     def to_volume_secret(self) -> tuple[k8s.V1Volume, k8s.V1VolumeMount]:
-        """Converts to volume secret."""
+        """Convert to volume secret."""
         vol_id = f"secretvol{uuid.uuid4()}"
         volume = k8s.V1Volume(name=vol_id, secret=k8s.V1SecretVolumeSource(secret_name=self.secret))
         if self.items:
diff --git a/airflow/lineage/__init__.py b/airflow/lineage/__init__.py
index 22e7d82c09..e22f264fdb 100644
--- a/airflow/lineage/__init__.py
+++ b/airflow/lineage/__init__.py
@@ -38,7 +38,7 @@ log = logging.getLogger(__name__)
 
 
 def get_backend() -> LineageBackend | None:
-    """Gets the lineage backend if defined in the configs."""
+    """Get the lineage backend if defined in the configs."""
     clazz = conf.getimport("lineage", "backend", fallback=None)
 
     if clazz:
@@ -99,7 +99,7 @@ def apply_lineage(func: T) -> T:
 
 def prepare_lineage(func: T) -> T:
     """
-    Prepares the lineage inlets and outlets.
+    Prepare the lineage inlets and outlets.
 
     Inlets can be:
 
diff --git a/airflow/lineage/backend.py b/airflow/lineage/backend.py
index 29a755109c..1ccfa78b89 100644
--- a/airflow/lineage/backend.py
+++ b/airflow/lineage/backend.py
@@ -35,7 +35,7 @@ class LineageBackend:
         context: dict | None = None,
     ):
         """
-        Sends lineage metadata to a backend.
+        Send lineage metadata to a backend.
 
         :param operator: the operator executing a transformation on the inlets and outlets
         :param inlets: the inlets to this operator
diff --git a/airflow/listeners/spec/dagrun.py b/airflow/listeners/spec/dagrun.py
index d2ae1a6b78..3337f4b9a1 100644
--- a/airflow/listeners/spec/dagrun.py
+++ b/airflow/listeners/spec/dagrun.py
@@ -29,14 +29,14 @@ hookspec = HookspecMarker("airflow")
 
 @hookspec
 def on_dag_run_running(dag_run: DagRun, msg: str):
-    """Called when dag run state changes to RUNNING."""
+    """Execute when dag run state changes to RUNNING."""
 
 
 @hookspec
 def on_dag_run_success(dag_run: DagRun, msg: str):
-    """Called when dag run state changes to SUCCESS."""
+    """Execute when dag run state changes to SUCCESS."""
 
 
 @hookspec
 def on_dag_run_failed(dag_run: DagRun, msg: str):
-    """Called when dag run state changes to FAIL."""
+    """Execute when dag run state changes to FAIL."""
diff --git a/airflow/listeners/spec/lifecycle.py b/airflow/listeners/spec/lifecycle.py
index 6ab0aa3b5c..c5e3bb52e4 100644
--- a/airflow/listeners/spec/lifecycle.py
+++ b/airflow/listeners/spec/lifecycle.py
@@ -25,7 +25,7 @@ hookspec = HookspecMarker("airflow")
 @hookspec
 def on_starting(component):
     """
-    Called before Airflow component - jobs like scheduler, worker, or task runner starts.
+    Execute before Airflow component - jobs like scheduler, worker, or task runner starts.
 
     It's guaranteed this will be called before any other plugin method.
 
@@ -36,7 +36,7 @@ def on_starting(component):
 @hookspec
 def before_stopping(component):
     """
-    Called before Airflow component - jobs like scheduler, worker, or task runner stops.
+    Execute before Airflow component - jobs like scheduler, worker, or task runner stops.
 
     It's guaranteed this will be called after any other plugin method.
 
diff --git a/airflow/listeners/spec/taskinstance.py b/airflow/listeners/spec/taskinstance.py
index b87043a99d..03f0a00478 100644
--- a/airflow/listeners/spec/taskinstance.py
+++ b/airflow/listeners/spec/taskinstance.py
@@ -34,18 +34,18 @@ hookspec = HookspecMarker("airflow")
 def on_task_instance_running(
     previous_state: TaskInstanceState | None, task_instance: TaskInstance, session: Session | None
 ):
-    """Called when task state changes to RUNNING. previous_state can be None."""
+    """Execute when task state changes to RUNNING. previous_state can be None."""
 
 
 @hookspec
 def on_task_instance_success(
     previous_state: TaskInstanceState | None, task_instance: TaskInstance, session: Session | None
 ):
-    """Called when task state changes to SUCCESS. previous_state can be None."""
+    """Execute when task state changes to SUCCESS. previous_state can be None."""
 
 
 @hookspec
 def on_task_instance_failed(
     previous_state: TaskInstanceState | None, task_instance: TaskInstance, session: Session | None
 ):
-    """Called when task state changes to FAIL. previous_state can be None."""
+    """Execute when task state changes to FAIL. previous_state can be None."""