@@ -17,2165 +17,35 @@
 # specific language governing permissions and limitations
 # under the License.
 """Command-line interface."""
 from __future__ import annotations
 import argparse
-import json
-import os
-import textwrap
-from argparse import Action, ArgumentError, RawTextHelpFormatter
+from argparse import Action, RawTextHelpFormatter
 from functools import lru_cache
-from typing import Callable, Iterable, NamedTuple, Union
-import lazy_object_proxy
+from typing import Iterable
-from airflow import settings
-from airflow.cli.commands.legacy_commands import check_legacy_command
-from airflow.configuration import conf
+from airflow.cli.cli_config import (
+    ActionCommand,
+    Arg,
+    CLICommand,
+    DefaultHelpParser,
+    GroupCommand,
+    core_commands,
 from airflow.exceptions import AirflowException
-from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR
 from airflow.executors.executor_loader import ExecutorLoader
-from airflow.utils.cli import ColorMode
 from airflow.utils.helpers import partition
-from airflow.utils.module_loading import import_string
-from airflow.utils.timezone import parse as parsedate
-def lazy_load_command(import_path: str) -> Callable:
-    """Create a lazy loader for command."""
-    _, _, name = import_path.rpartition(".")
-    def command(*args, **kwargs):
-        func = import_string(import_path)
-        return func(*args, **kwargs)
-    command.__name__ = name
-    return command
-class DefaultHelpParser(argparse.ArgumentParser):
-    """CustomParser to display help message."""
-    def _check_value(self, action, value):
-        """Override _check_value and check conditionally added command."""
-        if action.dest == "subcommand" and value == "celery":
-            executor = conf.get("core", "EXECUTOR")
-            if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
-                executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
-                classes = ()
-                try:
-                    from airflow.executors.celery_executor import CeleryExecutor
-                    classes += (CeleryExecutor,)
-                except ImportError:
-                    message = (
-                        "The celery subcommand requires that you pip install the celery module. "
-                        "To do it, run: pip install 'apache-airflow[celery]'"
-                    )
-                    raise ArgumentError(action, message)
-                try:
-                    from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
-                    classes += (CeleryKubernetesExecutor,)
-                except ImportError:
-                    pass
-                if not issubclass(executor_cls, classes):
-                    message = (
-                        f"celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and "
-                        f"executors derived from them, your current executor: {executor}, subclassed from: "
-                        f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}'
-                    )
-                    raise ArgumentError(action, message)
-        if action.dest == "subcommand" and value == "kubernetes":
-            try:
-                import kubernetes.client  # noqa: F401
-            except ImportError:
-                message = (
-                    "The kubernetes subcommand requires that you pip install the kubernetes python client. "
-                    "To do it, run: pip install 'apache-airflow[cncf.kubernetes]'"
-                )
-                raise ArgumentError(action, message)
-        if action.choices is not None and value not in action.choices:
-            check_legacy_command(action, value)
-        super()._check_value(action, value)
-    def error(self, message):
-        """Override error and use print_instead of print_usage."""
-        self.print_help()
-        self.exit(2, f"\n{self.prog} command error: {message}, see help above.\n")
-# Used in Arg to enable `None' as a distinct value from "not passed"
-_UNSET = object()
-class Arg:
-    """Class to keep information about command line argument."""
-    def __init__(
-        self,
-        flags=_UNSET,
-        help=_UNSET,
-        action=_UNSET,
-        default=_UNSET,
-        nargs=_UNSET,
-        type=_UNSET,
-        choices=_UNSET,
-        required=_UNSET,
-        metavar=_UNSET,
-        dest=_UNSET,
-    ):
-        self.flags = flags
-        self.kwargs = {}
-        for k, v in locals().items():
-            if v is _UNSET:
-                continue
-            if k in ("self", "flags"):
-                continue
-            self.kwargs[k] = v
-    def add_to_parser(self, parser: argparse.ArgumentParser):
-        """Add this argument to an ArgumentParser."""
-        parser.add_argument(*self.flags, **self.kwargs)
-def positive_int(*, allow_zero):
-    """Define a positive int type for an argument."""
-    def _check(value):
-        try:
-            value = int(value)
-            if allow_zero and value == 0:
-                return value
-            if value > 0:
-                return value
-        except ValueError:
-            pass
-        raise argparse.ArgumentTypeError(f"invalid positive int value: '{value}'")
-    return _check
-def string_list_type(val):
-    """Parses comma-separated list and returns list of string (strips whitespace)."""
-    return [x.strip() for x in val.split(",")]
-def string_lower_type(val):
-    """Lowers arg."""
-    if not val:
-        return
-    return val.strip().lower()
-# Shared
-ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag")
-ARG_TASK_ID = Arg(("task_id",), help="The id of the task")
-ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the DAG", type=parsedate)
-    ("execution_date",), nargs="?", help="The execution date of the DAG (optional)", type=parsedate
-    ("execution_date_or_run_id",), help="The execution_date of the DAG or run_id of the DAGRun"
-    ("execution_date_or_run_id",),
-    nargs="?",
-    help="The execution_date of the DAG or run_id of the DAGRun (optional)",
-    ("-t", "--task-regex"), help="The regex to filter specific task_ids to backfill (optional)"
-    ("-S", "--subdir"),
-    help=(
-        "File location or directory from which to look for the dag. "
-        "Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the "
-        "value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg' "
-    ),
-    default="[AIRFLOW_HOME]/dags" if BUILD_DOCS else settings.DAGS_FOLDER,
-ARG_START_DATE = Arg(("-s", "--start-date"), help="Override start_date YYYY-MM-DD", type=parsedate)
-ARG_END_DATE = Arg(("-e", "--end-date"), help="Override end_date YYYY-MM-DD", type=parsedate)
-    (
-        "-o",
-        "--output-path",
-    ),
-    help="The output for generated yaml files",
-    type=str,
-    default="[CWD]" if BUILD_DOCS else os.getcwd(),
-    ("-n", "--dry-run"),
-    help="Perform a dry run for each task. Only renders Template Fields for each task, nothing else",
-    action="store_true",
-ARG_PID = Arg(("--pid",), help="PID file location", nargs="?")
-    ("-D", "--daemon"), help="Daemonize instead of running in the foreground", action="store_true"
-ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file")
-ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file")
-ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file")
-ARG_YES = Arg(
-    ("-y", "--yes"),
-    help="Do not prompt to confirm. Use with care!",
-    action="store_true",
-    default=False,
-    (
-        "-o",
-        "--output",
-    ),
-    help="Output format. Allowed values: json, yaml, plain, table (default: table)",
-    metavar="(table, json, yaml, plain)",
-    choices=("table", "json", "yaml", "plain"),
-    default="table",
-    ("--color",),
-    help="Do emit colored output (default: auto)",
-    choices={ColorMode.ON, ColorMode.OFF, ColorMode.AUTO},
-    default=ColorMode.AUTO,
-# DB args
-    ("-r", "--range"),
-    help="Version range(start:end) for offline sql generation. Example: '2.0.2:2.2.3'",
-    default=None,
-    ("--revision-range",),
-    help=(
-        "Migration revision range(start:end) to use for offline sql generation. "
-        "Example: ``a13f7613ad25:7b2661a43ba3``"
-    ),
-    default=None,
-# list_dag_runs
-    ("-d", "--dag-id"), required=True, help="The id of the dag"
-)  # TODO: convert this to a positional arg in Airflow 3
-    ("--no-backfill",), help="filter all the backfill dagruns given the dag id", action="store_true"
-ARG_STATE = Arg(("--state",), help="Only list the dag runs corresponding to the state")
-# list_jobs
-ARG_DAG_ID_OPT = Arg(("-d", "--dag-id"), help="The id of the dag")
-ARG_LIMIT = Arg(("--limit",), help="Return a limited number of records")
-# next_execution
-    ("-n", "--num-executions"),
-    default=1,
-    type=positive_int(allow_zero=False),
-    help="The number of next execution datetimes to show",
-# backfill
-    ("-m", "--mark-success"), help="Mark jobs as succeeded without running them", action="store_true"
-ARG_VERBOSE = Arg(("-v", "--verbose"), help="Make logging output more verbose", action="store_true")
-ARG_LOCAL = Arg(("-l", "--local"), help="Run the task using the LocalExecutor", action="store_true")
-    ("-x", "--donot-pickle"),
-    help=(
-        "Do not attempt to pickle the DAG object to send over "
-        "to the workers, just tell the workers to run their version "
-        "of the code"
-    ),
-    action="store_true",
-    ("-i", "--ignore-dependencies"),
-    help=(
-        "Skip upstream tasks, run only the tasks "
-        "matching the regexp. Only works in conjunction "
-        "with task_regex"
-    ),
-    action="store_true",
-    ("-I", "--ignore-first-depends-on-past"),
-    help=(
-        "Ignores depends_on_past dependencies for the first "
-        "set of tasks only (subsequent executions in the backfill "
-        "DO respect depends_on_past)"
-    ),
-    action="store_true",
-ARG_POOL = Arg(("--pool",), "Resource pool to use")
-    ("--delay-on-limit",),
-    help=(
-        "Amount of time in seconds to wait when the limit "
-        "on maximum active dag runs (max_active_runs) has "
-        "been reached before trying to execute a dag run "
-        "again"
-    ),
-    type=float,
-    default=1.0,
-    ("--reset-dagruns",),
-    help=(
-        "if set, the backfill will delete existing "
-        "backfill-related DAG runs and start "
-        "anew with fresh, running DAG runs"
-    ),
-    action="store_true",
-    ("--rerun-failed-tasks",),
-    help=(
-        "if set, the backfill will auto-rerun "
-        "all the failed tasks for the backfill date range "
-        "instead of throwing exceptions"
-    ),
-    action="store_true",
-    ("--continue-on-failures",),
-    help=("if set, the backfill will keep going even if some of the tasks failed"),
-    action="store_true",
-    ("--disable-retry",),
-    help=("if set, the backfill will set tasks as failed without retrying."),
-    action="store_true",
-    (
-        "-B",
-        "--run-backwards",
-    ),
-    help=(
-        "if set, the backfill will run tasks from the most "
-        "recent day first.  if there are tasks that depend_on_past "
-        "this option will throw an exception"
-    ),
-    action="store_true",
-    ("--treat-dag-as-regex",),
-    help=("if set, dag_id will be treated as regex instead of an exact string"),
-    action="store_true",
-# test_dag
-    ("--show-dagrun",),
-    help=(
-        "After completing the backfill, shows the diagram for current DAG Run.\n"
-        "\n"
-        "The diagram is in DOT language\n"
-    ),
-    action="store_true",
-    ("--imgcat-dagrun",),
-    help=(
-        "After completing the dag run, prints a diagram on the screen for the "
-        "current DAG Run using the imgcat tool.\n"
-    ),
-    action="store_true",
-    ("--save-dagrun",),
-    help="After completing the backfill, saves the diagram for current DAG Run to the indicated file.\n\n",
-# list_tasks
-ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true")
-# tasks_run
-# This is a hidden option -- not meant for users to set or know about
-    ("--no-shut-down-logging",),
-    help=argparse.SUPPRESS,
-    dest="shut_down_logging",
-    action="store_false",
-    default=True,
-# clear
-ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", action="store_true")
-ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", action="store_true")
-ARG_ONLY_RUNNING = Arg(("-r", "--only-running"), help="Only running jobs", action="store_true")
-ARG_DOWNSTREAM = Arg(("-d", "--downstream"), help="Include downstream tasks", action="store_true")
-ARG_EXCLUDE_SUBDAGS = Arg(("-x", "--exclude-subdags"), help="Exclude subdags", action="store_true")
-    ("-X", "--exclude-parentdag"),
-    help="Exclude ParentDAGS if the task cleared is a part of a SubDAG",
-    action="store_true",
-    ("-R", "--dag-regex"), help="Search dag_id as regex instead of exact string", action="store_true"
-# show_dag
-ARG_SAVE = Arg(("-s", "--save"), help="Saves the result to the indicated file.")
-ARG_IMGCAT = Arg(("--imgcat",), help="Displays graph using the imgcat tool.", action="store_true")
-# trigger_dag
-ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
-ARG_CONF = Arg(("-c", "--conf"), help="JSON string that gets pickled into the DagRun's conf attribute")
-ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the DAG", type=parsedate)
-    ("--no-replace-microseconds",),
-    help="whether microseconds should be zeroed",
-    dest="replace_microseconds",
-    action="store_false",
-    default=True,
-# db
-    ("-t", "--tables"),
-    help=lazy_object_proxy.Proxy(
-        lambda: f"Table names to perform maintenance on (use comma-separated list).\n"
-        f"Options: {import_string('airflow.cli.commands.db_command.all_tables')}"
-    ),
-    type=string_list_type,
-    ("--clean-before-timestamp",),
-    help="The date or timestamp before which data should be purged.\n"
-    "If no timezone info is supplied then dates are assumed to be in airflow default timezone.\n"
-    "Example: '2022-01-01 00:00:00+01:00'",
-    type=parsedate,
-    required=True,
-    ("--dry-run",),
-    help="Perform a dry run",
-    action="store_true",
-    ("--skip-archive",),
-    help="Don't preserve purged records in an archive table.",
-    action="store_true",
-# pool
-ARG_POOL_NAME = Arg(("pool",), metavar="NAME", help="Pool name")
-ARG_POOL_SLOTS = Arg(("slots",), type=int, help="Pool slots")
-ARG_POOL_DESCRIPTION = Arg(("description",), help="Pool description")
-    ("file",),
-    metavar="FILEPATH",
-    help="Import pools from JSON file. Example format::\n"
-    + textwrap.indent(
-        textwrap.dedent(
-            """
-            {
-                "pool_1": {"slots": 5, "description": ""},
-                "pool_2": {"slots": 10, "description": "test"}
-            }"""
-        ),
-        " " * 4,
-    ),
-ARG_POOL_EXPORT = Arg(("file",), metavar="FILEPATH", help="Export all pools to JSON file")
-# variables
-ARG_VAR = Arg(("key",), help="Variable key")
-ARG_VAR_VALUE = Arg(("value",), metavar="VALUE", help="Variable value")
-    ("-d", "--default"), metavar="VAL", default=None, help="Default value returned if variable does not exist"
-ARG_DESERIALIZE_JSON = Arg(("-j", "--json"), help="Deserialize JSON variable", action="store_true")
-ARG_SERIALIZE_JSON = Arg(("-j", "--json"), help="Serialize JSON variable", action="store_true")
-ARG_VAR_IMPORT = Arg(("file",), help="Import variables from JSON file")
-ARG_VAR_EXPORT = Arg(("file",), help="Export all variables to JSON file")
-# kerberos
-ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?")
-ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs="?", default=conf.get("kerberos", "keytab"))
-# run
-    ("-N", "--interactive"),
-    help="Do not capture standard output and error streams (useful for interactive debugging)",
-    action="store_true",
-# TODO(aoen): "force" is a poor choice of name here since it implies it overrides
-# all dependencies (not just past success), e.g. the ignore_depends_on_past
-# dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and
-# the "ignore_all_dependencies" command should be called the"force" command
-# instead.
-    ("-f", "--force"),
-    help="Ignore previous task instance state, rerun regardless if task already succeeded/failed",
-    action="store_true",
-ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
-    ("-A", "--ignore-all-dependencies"),
-    help="Ignores all non-critical dependencies, including ignore_ti_state and ignore_task_deps",
-    action="store_true",
-# TODO(aoen): ignore_dependencies is a poor choice of name here because it is too
-# vague (e.g. a task being in the appropriate state to be run is also a dependency
-# but is not ignored by this flag), the name 'ignore_task_dependencies' is
-# slightly better (as it ignores all dependencies that are specific to the task),
-# so deprecate the old command name and use this instead.
-    ("-i", "--ignore-dependencies"),
-    help="Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies",
-    action="store_true",
-    ("-I", "--ignore-depends-on-past"),
-    help="Deprecated -- use `--depends-on-past ignore` instead. "
-    "Ignore depends_on_past dependencies (but respect upstream dependencies)",
-    action="store_true",
-    ("-d", "--depends-on-past"),
-    help="Determine how Airflow should deal with past dependencies. The default action is `check`, Airflow "
-    "will check if the the past dependencies are met for the tasks having `depends_on_past=True` before run "
-    "them, if `ignore` is provided, the past dependencies will be ignored, if `wait` is provided and "
-    "`depends_on_past=True`, Airflow will wait the past dependencies until they are met before running or "
-    "skipping the task",
-    choices={"check", "ignore", "wait"},
-    default="check",
-    ("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true"
-ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)")
-ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
-ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
-ARG_MAP_INDEX = Arg(("--map-index",), type=int, default=-1, help="Mapped task index")
-# database
-    ("-t", "--migration-wait-timeout"),
-    help="timeout to wait for db to migrate ",
-    type=int,
-    default=60,
-    ("--no-reserialize-dags",),
-    # Not intended for user, so dont show in help
-    help=argparse.SUPPRESS,
-    action="store_false",
-    default=True,
-    dest="reserialize_dags",
-    ("-n", "--to-version"),
-    help=(
-        "(Optional) The airflow version to upgrade to. Note: must provide either "
-        "`--to-revision` or `--to-version`."
-    ),
-    ("-r", "--to-revision"),
-    help="(Optional) If provided, only run migrations up to and including this Alembic revision.",
-    ("-n", "--to-version"),
-    help="(Optional) If provided, only run migrations up to this version.",
-    ("--from-version",),
-    help="(Optional) If generating sql, may supply a *from* version",
-    ("-r", "--to-revision"),
-    help="The Alembic revision to downgrade to. Note: must provide either `--to-revision` or `--to-version`.",
-    ("--from-revision",),
-    help="(Optional) If generating sql, may supply a *from* Alembic revision",
-    ("-s", "--show-sql-only"),
-    help="Don't actually run migrations; just print out sql scripts for offline migration. "
-    "Required if using either `--from-revision` or `--from-version`.",
-    action="store_true",
-    default=False,
-    ("-s", "--skip-init"),
-    help="Only remove tables; do not perform db init.",
-    action="store_true",
-    default=False,
-# webserver
-ARG_PORT = Arg(
-    ("-p", "--port"),
-    default=conf.get("webserver", "WEB_SERVER_PORT"),
-    type=int,
-    help="The port on which to run the server",
-    ("--ssl-cert",),
-    default=conf.get("webserver", "WEB_SERVER_SSL_CERT"),
-    help="Path to the SSL certificate for the webserver",
-    ("--ssl-key",),
-    default=conf.get("webserver", "WEB_SERVER_SSL_KEY"),
-    help="Path to the key to use with the SSL certificate",
-    ("-w", "--workers"),
-    default=conf.get("webserver", "WORKERS"),
-    type=int,
-    help="Number of workers to run the webserver on",
-    ("-k", "--workerclass"),
-    default=conf.get("webserver", "WORKER_CLASS"),
-    choices=["sync", "eventlet", "gevent", "tornado"],
-    help="The worker class to use for Gunicorn",
-    ("-t", "--worker-timeout"),
-    default=conf.get("webserver", "WEB_SERVER_WORKER_TIMEOUT"),
-    type=int,
-    help="The timeout for waiting on webserver workers",
-    ("-H", "--hostname"),
-    default=conf.get("webserver", "WEB_SERVER_HOST"),
-    help="Set the hostname on which to run the web server",
-    ("-d", "--debug"), help="Use the server that ships with Flask in debug mode", action="store_true"
-    ("-A", "--access-logfile"),
-    default=conf.get("webserver", "ACCESS_LOGFILE"),
-    help="The logfile to store the webserver access log. Use '-' to print to stdout",
-    ("-E", "--error-logfile"),
-    default=conf.get("webserver", "ERROR_LOGFILE"),
-    help="The logfile to store the webserver error log. Use '-' to print to stderr",
-    ("-L", "--access-logformat"),
-    default=conf.get("webserver", "ACCESS_LOGFORMAT"),
-    help="The access log format for gunicorn logs",
-# scheduler
-    ("-n", "--num-runs"),
-    default=conf.getint("scheduler", "num_runs"),
-    type=int,
-    help="Set the number of runs to execute before exiting",
-    ("-p", "--do-pickle"),
-    default=False,
-    help=(
-        "Attempt to pickle the DAG object to send over "
-        "to the workers, instead of letting workers run their version "
-        "of the code"
-    ),
-    action="store_true",
-# worker
-    ("-q", "--queues"),
-    help="Comma delimited list of queues to serve",
-    default=conf.get("operators", "DEFAULT_QUEUE"),
-    ("-c", "--concurrency"),
-    type=int,
-    help="The number of worker processes",
-    default=conf.get("celery", "worker_concurrency"),
-    ("-H", "--celery-hostname"),
-    help="Set the hostname of celery worker if you have multiple workers on a single machine",
-    ("-u", "--umask"),
-    help="Set the umask of celery worker in daemon mode",
-    ("--without-mingle",),
-    default=False,
-    help="Don't synchronize with other workers at start-up",
-    action="store_true",
-    ("--without-gossip",),
-    default=False,
-    help="Don't subscribe to other workers events",
-    action="store_true",
-# flower
-ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
-    ("-H", "--hostname"),
-    default=conf.get("celery", "FLOWER_HOST"),
-    help="Set the hostname on which to run the server",
-    ("-p", "--port"),
-    default=conf.get("celery", "FLOWER_PORT"),
-    type=int,
-    help="The port on which to run the server",
-ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower")
-    ("-u", "--url-prefix"), default=conf.get("celery", "FLOWER_URL_PREFIX"), help="URL prefix for Flower"
-    ("-A", "--basic-auth"),
-    default=conf.get("celery", "FLOWER_BASIC_AUTH"),
-    help=(
-        "Securing Flower with Basic Authentication. "
-        "Accepts user:password pairs separated by a comma. "
-        "Example: flower_basic_auth = user1:password1,user2:password2"
-    ),
-ARG_TASK_PARAMS = Arg(("-t", "--task-params"), help="Sends a JSON params dict to the task")
-    ("-m", "--post-mortem"), action="store_true", help="Open debugger on uncaught exception"
-    ("--env-vars",),
-    help="Set env var in both parsing time and runtime for each of entry supplied in a JSON dict",
-    type=json.loads,
-# connections
-ARG_CONN_ID = Arg(("conn_id",), help="Connection id, required to get/add/delete a connection", type=str)
-    ("--conn-id",), help="If passed, only items with the specified connection ID will be displayed", type=str
-    ("--conn-uri",), help="Connection URI, required to add a connection without conn_type", type=str
-    ("--conn-json",), help="Connection JSON, required to add a connection using JSON representation", type=str
-    ("--conn-type",), help="Connection type, required to add a connection without conn_uri", type=str
-    ("--conn-description",), help="Connection description, optional when adding a connection", type=str
-ARG_CONN_HOST = Arg(("--conn-host",), help="Connection host, optional when adding a connection", type=str)
-ARG_CONN_LOGIN = Arg(("--conn-login",), help="Connection login, optional when adding a connection", type=str)
-    ("--conn-password",), help="Connection password, optional when adding a connection", type=str
-    ("--conn-schema",), help="Connection schema, optional when adding a connection", type=str
-ARG_CONN_PORT = Arg(("--conn-port",), help="Connection port, optional when adding a connection", type=str)
-    ("--conn-extra",), help="Connection `Extra` field, optional when adding a connection", type=str
-    ("file",),
-    help="Output file path for exporting the connections",
-    type=argparse.FileType("w", encoding="UTF-8"),
-    ("--format",),
-    help="Deprecated -- use `--file-format` instead. File format to use for the export.",
-    type=str,
-    choices=["json", "yaml", "env"],
-    ("--file-format",), help="File format for the export", type=str, choices=["json", "yaml", "env"]
-    ("--serialization-format",),
-    help="When exporting as `.env` format, defines how connections should be serialized. Default is `uri`.",
-    type=string_lower_type,
-    choices=["json", "uri"],
-ARG_CONN_IMPORT = Arg(("file",), help="Import connections from a file")
-    ("--overwrite",),
-    help="Overwrite existing entries if a conflict occurs",
-    required=False,
-    action="store_true",
-# providers
-    ("provider_name",), help="Provider name, required to get provider information", type=str
-ARG_FULL = Arg(
-    ("-f", "--full"),
-    help="Full information about the provider, including documentation information.",
-    required=False,
-    action="store_true",
-# users
-ARG_USERNAME = Arg(("-u", "--username"), help="Username of the user", required=True, type=str)
-ARG_USERNAME_OPTIONAL = Arg(("-u", "--username"), help="Username of the user", type=str)
-ARG_FIRSTNAME = Arg(("-f", "--firstname"), help="First name of the user", required=True, type=str)
-ARG_LASTNAME = Arg(("-l", "--lastname"), help="Last name of the user", required=True, type=str)
-ARG_ROLE = Arg(
-    ("-r", "--role"),
-    help="Role of the user. Existing roles include Admin, User, Op, Viewer, and Public",
-    required=True,
-    type=str,
-ARG_EMAIL = Arg(("-e", "--email"), help="Email of the user", required=True, type=str)
-ARG_EMAIL_OPTIONAL = Arg(("-e", "--email"), help="Email of the user", type=str)
-    ("-p", "--password"),
-    help="Password of the user, required to create a user without --use-random-password",
-    type=str,
-    ("--use-random-password",),
-    help="Do not prompt for password. Use random string instead."
-    " Required to create a user without --password ",
-    default=False,
-    action="store_true",
-    ("import",),
-    metavar="FILEPATH",
-    help="Import users from JSON file. Example format::\n"
-    + textwrap.indent(
-        textwrap.dedent(
-            """
-            [
-                {
-                    "email": "",
-                    "firstname": "Jon",
-                    "lastname": "Doe",
-                    "roles": ["Public"],
-                    "username": "jondoe"
-                }
-            ]"""
-        ),
-        " " * 4,
-    ),
-ARG_USER_EXPORT = Arg(("export",), metavar="FILEPATH", help="Export all users to JSON file")
-# roles
-ARG_CREATE_ROLE = Arg(("-c", "--create"), help="Create a new role", action="store_true")
-ARG_LIST_ROLES = Arg(("-l", "--list"), help="List roles", action="store_true")
-ARG_ROLES = Arg(("role",), help="The name of a role", nargs="*")
-ARG_PERMISSIONS = Arg(("-p", "--permission"), help="Show role permissions", action="store_true")
-ARG_ROLE_RESOURCE = Arg(("-r", "--resource"), help="The name of permissions", nargs="*", required=True)
-ARG_ROLE_ACTION = Arg(("-a", "--action"), help="The action of permissions", nargs="*")
-ARG_ROLE_ACTION_REQUIRED = Arg(("-a", "--action"), help="The action of permissions", nargs="*", required=True)
-ARG_AUTOSCALE = Arg(("-a", "--autoscale"), help="Minimum and Maximum number of worker to autoscale")
-    ("-s", "--skip-serve-logs"),
-    default=False,
-    help="Don't start the serve logs process along with the workers",
-    action="store_true",
-ARG_ROLE_IMPORT = Arg(("file",), help="Import roles from JSON file", nargs=None)
-ARG_ROLE_EXPORT = Arg(("file",), help="Export all roles to JSON file", nargs=None)
-    ("-p", "--pretty"),
-    help="Format output JSON file by sorting role names and indenting by 4 spaces",
-    action="store_true",
-# info
-    ("--anonymize",),
-    help="Minimize any personal identifiable information. Use it when sharing output with others.",
-    action="store_true",
-    ("--file-io",), help="Send output to service and returns link.", action="store_true"
-# config
-    ("section",),
-    help="The section name",
-    ("option",),
-    help="The option name",
-    ("--section",),
-    help="The section name",
-# kubernetes cleanup-pods
-    ("--namespace",),
-    default=conf.get("kubernetes_executor", "namespace"),
-    help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in configuration.",
-    ("--min-pending-minutes",),
-    default=30,
-    type=positive_int(allow_zero=False),
-    help=(
-        "Pending pods created before the time interval are to be cleaned up, "
-        "measured in minutes. Default value is 30(m). The minimum value is 5(m)."
-    ),
-# jobs check
-    ("--job-type",),
-    choices=("BackfillJob", "LocalTaskJob", "SchedulerJob", "TriggererJob"),
-    action="store",
-    help="The type of job(s) that will be checked.",
-    ("--hostname",),
-    default=None,
-    type=str,
-    help="The hostname of job(s) that will be checked.",
-    ("--local",),
-    action="store_true",
-    help="If passed, this command will only show jobs from the local host "
-    "(those with a hostname matching what `hostname_callable` returns).",
-    ("--limit",),
-    default=1,
-    type=positive_int(allow_zero=True),
-    help="The number of recent jobs that will be checked. To disable limit, set 0. ",
-    ("--allow-multiple",),
-    action="store_true",
-    help="If passed, this command will be successful even if multiple matching alive jobs are found.",
-# sync-perm
-    ("--include-dags",), help="If passed, DAG specific permissions will also be synced.", action="store_true"
-# triggerer
-    ("--capacity",),
-    type=positive_int(allow_zero=False),
-    help="The maximum number of triggers that a Triggerer will run at one time.",
-# reserialize
-    ("--clear-only",),
-    action="store_true",
-    help="If passed, serialized DAGs will be cleared but not reserialized.",
+airflow_commands = core_commands
+for executor in ExecutorLoader.import_all_executor_classes(silent=True):

Review Comment:
   I really appreciate the time to read the thread and send a thoughtful reply @potiuk :pray: 
   Unless anyone else disagrees I'll go ahead with this approach. 
   There will also need to be a followup PR which removes some of the error handling we attempt to do (basically the thing that produces that message you ran into). Core airflow should not be attempting to do this, I think we agree on that, but correct me if not.

