You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/02 17:21:02 UTC

[airflow] branch v1-10-stable updated: Add new-style 2.0 command names for Airflow 1.10.x (#12725)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-stable by this push:
     new 5fbd0ed  Add new-style 2.0 command names for Airflow 1.10.x (#12725)
5fbd0ed is described below

commit 5fbd0ed4869c2f0b0fcdc63028ee2109f10a61c7
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Dec 2 17:20:09 2020 +0000

    Add new-style 2.0 command names for Airflow 1.10.x (#12725)
    
    This introduces the new style commands for 1.10, but keeps all the
    existing commands working, they just don't show in the top level
    `airflow --help` anymore.
    
    This was done by taking the argparse groups/sub-commands from master
    (along with supporting functions) and copying it here, then providing
    needed shims for compat.
    
    I have not added any new sub-commands, nor changed any existing
    behaviours, so the 1.10 commands won't change in behaviour.
---
 UPDATING.md                             |   57 ++
 airflow/bin/cli.py                      | 1218 +++++++++++++++++++++++++++++--
 airflow/utils/cli.py                    |   65 +-
 tests/cli/test_cli.py                   |    2 +
 tests/cli/test_worker_initialisation.py |    2 +-
 5 files changed, 1277 insertions(+), 67 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index c29c8a3..577b644 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -25,6 +25,7 @@ assists users migrating to a new version.
 <!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
 **Table of contents**
 
+- [Airflow 1.10.14](#airflow-11014)
 - [Airflow 1.10.13](#airflow-11013)
 - [Airflow 1.10.12](#airflow-11012)
 - [Airflow 1.10.11](#airflow-11011)
@@ -61,6 +62,62 @@ More tips can be found in the guide:
 https://developers.google.com/style/inclusive-documentation
 
 -->
+## Airflow 1.10.14
+
+### Airflow CLI changes in line with  2.0
+
+The Airflow CLI has been organized so that related commands are grouped together as subcommands,
+which means that if you use these commands in your scripts, you have to make changes to them.
+
+This section describes the changes that have been made, and what you need to do to update your script.
+
+The ability to manipulate users from the command line has been changed. ``airflow create_user``,  ``airflow delete_user``
+ and ``airflow list_users`` has been grouped to a single command `airflow users` with optional flags `create`, `list` and `delete`.
+
+The `airflow list_dags` command is now `airflow dags list`, `airflow pause` is `airflow dags pause`, etc.
+
+In Airflow 1.10 and 2.0 there is an `airflow config` command but there is a difference in behavior. In Airflow 1.10,
+it prints all config options while in Airflow 2.0, it's a command group. `airflow config` is now `airflow config list`.
+You can check other options by running the command `airflow config --help`
+
+Compatibility with the old CLI has been maintained, but they will no longer appear in the help
+
+You can learn about the commands by running ``airflow --help``. For example to get help about the ``celery`` group command,
+you have to run the help command: ``airflow celery --help``.
+
+| Old command                   | New command                        |     Group          |
+|-------------------------------|------------------------------------|--------------------|
+| ``airflow worker``            | ``airflow celery worker``          |    ``celery``      |
+| ``airflow flower``            | ``airflow celery flower``          |    ``celery``      |
+| ``airflow trigger_dag``       | ``airflow dags trigger``           |    ``dags``        |
+| ``airflow delete_dag``        | ``airflow dags delete``            |    ``dags``        |
+| ``airflow show_dag``          | ``airflow dags show``              |    ``dags``        |
+| ``airflow list_dag``          | ``airflow dags list``              |    ``dags``        |
+| ``airflow dag_status``        | ``airflow dags status``            |    ``dags``        |
+| ``airflow backfill``          | ``airflow dags backfill``          |    ``dags``        |
+| ``airflow list_dag_runs``     | ``airflow dags list-runs``         |    ``dags``        |
+| ``airflow pause``             | ``airflow dags pause``             |    ``dags``        |
+| ``airflow unpause``           | ``airflow dags unpause``           |    ``dags``        |
+| ``airflow next_execution``    | ``airflow dags next-execution``    |    ``dags``        |
+| ``airflow test``              | ``airflow tasks test``             |    ``tasks``       |
+| ``airflow clear``             | ``airflow tasks clear``            |    ``tasks``       |
+| ``airflow list_tasks``        | ``airflow tasks list``             |    ``tasks``       |
+| ``airflow task_failed_deps``  | ``airflow tasks failed-deps``      |    ``tasks``       |
+| ``airflow task_state``        | ``airflow tasks state``            |    ``tasks``       |
+| ``airflow run``               | ``airflow tasks run``              |    ``tasks``       |
+| ``airflow render``            | ``airflow tasks render``           |    ``tasks``       |
+| ``airflow initdb``            | ``airflow db init``                |     ``db``         |
+| ``airflow resetdb``           | ``airflow db reset``               |     ``db``         |
+| ``airflow upgradedb``         | ``airflow db upgrade``             |     ``db``         |
+| ``airflow checkdb``           | ``airflow db check``               |     ``db``         |
+| ``airflow shell``             | ``airflow db shell``               |     ``db``         |
+| ``airflow pool``              | ``airflow pools``                  |     ``pools``      |
+| ``airflow create_user``       | ``airflow users create``           |     ``users``      |
+| ``airflow delete_user``       | ``airflow users delete``           |     ``users``      |
+| ``airflow list_users``        | ``airflow users list``             |     ``users``      |
+| ``airflow rotate_fernet_key`` | ``airflow rotate-fernet-key``      |                    |
+| ``airflow sync_perm``         | ``airflow sync-perm``              |                    |
+
 ## Airflow 1.10.13
 
 ### TimeSensor is now timezone aware
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 37f70c4..c22e847 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,6 +22,7 @@ from __future__ import print_function
 import errno
 import hashlib
 import importlib
+import itertools
 import locale
 import logging
 
@@ -32,7 +33,7 @@ import textwrap
 import random
 import string
 import yaml
-from collections import OrderedDict
+from collections import OrderedDict, namedtuple
 from importlib import import_module
 
 import getpass
@@ -59,7 +60,7 @@ import threading
 import time
 import traceback
 
-from typing import Any
+from typing import Any, cast
 
 import airflow
 from airflow import api
@@ -100,7 +101,9 @@ log = logging.getLogger(__name__)
 
 DAGS_FOLDER = settings.DAGS_FOLDER
 
-if "BUILDING_AIRFLOW_DOCS" in os.environ:
+BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
+
+if BUILD_DOCS:
     DAGS_FOLDER = '[AIRFLOW_HOME]/dags'
 
 
@@ -238,6 +241,7 @@ def backfill(args, dag=None):
         )
 
 
+@cli_utils.deprecated_action(new_name='dags trigger')
 @cli_utils.action_logging
 def trigger_dag(args):
     """
@@ -256,6 +260,7 @@ def trigger_dag(args):
         raise AirflowException(err)
 
 
+@cli_utils.deprecated_action(new_name='dags delete')
 @cli_utils.action_logging
 def delete_dag(args):
     """
@@ -276,6 +281,41 @@ def delete_dag(args):
         print("Bail.")
 
 
+def _pool_wrapper(args, get=None, set=None, delete=None, export=None, imp=None):
+    args.get = get
+    args.set = set
+    args.delete = delete
+    args.export = export
+    setattr(args, 'import', imp)
+    pool(args)
+
+
+def pool_list(args):
+    _pool_wrapper(args)
+
+
+def pool_get(args):
+    _pool_wrapper(args, get=args.pool)
+
+
+def pool_set(args):
+    _pool_wrapper(args, set=(args.name, args.slots, args.description))
+
+
+def pool_delete(args):
+    _pool_wrapper(args, delete=pool.name)
+
+
+def pool_import(args):
+    _pool_wrapper(args, imp=args.file)
+
+
+def pool_export(args):
+    _pool_wrapper(args, export=args.file)
+
+
+@cli_utils.deprecated_action(new_name=['pools list', 'pools get', 'pools set', 'pools delete', 'pools import',
+                                       'pools export'])
 @cli_utils.action_logging
 def pool(args):
     def _tabulate(pools):
@@ -345,6 +385,36 @@ def pool_export_helper(filepath):
     return pools
 
 
+def _vars_wrapper(args, get=None, set=None, delete=None, export=None, imp=None):
+    args.get = get
+    args.set = set
+    args.delete = delete
+    args.export = export
+    setattr(args, 'import', imp)
+    variables(args)
+
+
+def variables_get(args):
+    _vars_wrapper(args, get=args.key)
+
+
+def variables_delete(args):
+    _vars_wrapper(args, delete=args.key)
+
+
+def variables_set(args):
+    _vars_wrapper(args, set=args.key)
+
+
+def variables_import(args):
+    _vars_wrapper(args, imp=args.file)
+
+
+def variables_export(args):
+    _vars_wrapper(args, export=args.file)
+
+
+@cli_utils.deprecated_action(new_name='variables')
 @cli_utils.action_logging
 def variables(args):
     if args.get:
@@ -417,11 +487,13 @@ def export_helper(filepath):
     print("{} variables successfully exported to {}".format(len(var_dict), filepath))
 
 
+@cli_utils.deprecated_action(new_name='dags pause')
 @cli_utils.action_logging
 def pause(args):
     set_is_paused(True, args)
 
 
+@cli_utils.deprecated_action(new_name='dags unpause')
 @cli_utils.action_logging
 def unpause(args):
     set_is_paused(False, args)
@@ -435,6 +507,7 @@ def set_is_paused(is_paused, args):
     print("Dag: {}, paused: {}".format(args.dag_id, str(is_paused)))
 
 
+@cli_utils.deprecated_action(new_name='dags show')
 def show_dag(args):
     dag = get_dag(args)
     dot = render_dag(dag)
@@ -513,6 +586,8 @@ def _run(args, dag, ti):
         executor.end()
 
 
+# Don't warn on deprecation on this one. It is deprecated, but it is used almost exclusively internally, and
+# by not warning we have to make a smaller code change.
 @cli_utils.action_logging
 def run(args, dag=None):
     if dag:
@@ -589,6 +664,7 @@ def run(args, dag=None):
     logging.shutdown()
 
 
+@cli_utils.deprecated_action(new_name='tasks failed-deps')
 @cli_utils.action_logging
 def task_failed_deps(args):
     """
@@ -616,6 +692,7 @@ def task_failed_deps(args):
         print("Task instance dependencies are all met.")
 
 
+@cli_utils.deprecated_action(new_name='tasks state')
 @cli_utils.action_logging
 def task_state(args):
     """
@@ -629,6 +706,7 @@ def task_state(args):
     print(ti.current_state())
 
 
+@cli_utils.deprecated_action(new_name='dags state')
 @cli_utils.action_logging
 def dag_state(args):
     """
@@ -641,6 +719,7 @@ def dag_state(args):
     print(dr[0].state if len(dr) > 0 else None)
 
 
+@cli_utils.deprecated_action(new_name='dags next-execution')
 @cli_utils.action_logging
 def next_execution(args):
     """
@@ -666,6 +745,7 @@ def next_execution(args):
         print(None)
 
 
+@cli_utils.deprecated_action(new_name='rotate-fernet-key')
 @cli_utils.action_logging
 def rotate_fernet_key(args):
     session = settings.Session()
@@ -677,6 +757,7 @@ def rotate_fernet_key(args):
     session.commit()
 
 
+@cli_utils.deprecated_action(new_name=['dags list', 'dags report'])
 @cli_utils.action_logging
 def list_dags(args):
     dagbag = DagBag(process_subdir(args.subdir))
@@ -688,10 +769,17 @@ def list_dags(args):
     """)
     dag_list = "\n".join(sorted(dagbag.dags))
     print(s.format(dag_list=dag_list))
-    if args.report:
+    if getattr(args, 'report', False):
         print(dagbag.dagbag_report())
 
 
+def list_dags_report(args):
+    args.report = True
+    args.deprecation_warning = False
+    list_dags(args)
+
+
+@cli_utils.deprecated_action(new_name='tasks list')
 @cli_utils.action_logging
 def list_tasks(args, dag=None):
     dag = dag or get_dag(args)
@@ -702,6 +790,7 @@ def list_tasks(args, dag=None):
         print("\n".join(sorted(tasks)))
 
 
+@cli_utils.deprecated_action(new_name='tasks test')
 @cli_utils.action_logging
 def test(args, dag=None):
     # We want log outout from operators etc to show up here. Normally
@@ -738,6 +827,7 @@ def test(args, dag=None):
         logging.getLogger('airflow.task').propagate = False
 
 
+@cli_utils.deprecated_action(new_name='tasks render')
 @cli_utils.action_logging
 def render(args):
     dag = get_dag(args)
@@ -753,6 +843,7 @@ def render(args):
         """.format(attr, getattr(task, attr))))
 
 
+@cli_utils.deprecated_action(new_name='tasks clear')
 @cli_utils.action_logging
 def clear(args):
     logging.basicConfig(
@@ -1249,6 +1340,7 @@ def _serve_logs(env, skip_serve_logs=False):
     return None
 
 
+@cli_utils.deprecated_action(new_name='kubernetes generate-dag-yaml')
 @cli_utils.action_logging
 def kubernetes_generate_dag_yaml(args):
     from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubeConfig
@@ -1372,6 +1464,7 @@ Happy Airflowing!
     print(output_string)
 
 
+@cli_utils.deprecated_action(new_name='celery worker')
 @cli_utils.action_logging
 def worker(args):
     env = os.environ.copy()
@@ -1447,6 +1540,7 @@ def worker(args):
         sp.kill()
 
 
+@cli_utils.deprecated_action(new_name='db init')
 def initdb(args):  # noqa
     py2_deprecation_waring()
     print("DB: " + repr(settings.engine.url))
@@ -1454,6 +1548,7 @@ def initdb(args):  # noqa
     print("Done.")
 
 
+@cli_utils.deprecated_action(new_name='db reset')
 def resetdb(args):
     py2_deprecation_waring()
     print("DB: " + repr(settings.engine.url))
@@ -1465,6 +1560,7 @@ def resetdb(args):
         print("Bail.")
 
 
+@cli_utils.deprecated_action(new_name='db shell')
 @cli_utils.action_logging
 def shell(args):
     """Run a shell that allows to access database access"""
@@ -1506,6 +1602,7 @@ def upgradedb(args):  # noqa
     db.upgradedb()
 
 
+@cli_utils.deprecated_action(new_name='db check')
 @cli_utils.action_logging
 def checkdb(args):  # noqa
     py2_deprecation_waring()
@@ -1522,6 +1619,26 @@ alternative_conn_specs = ['conn_type', 'conn_host',
                           'conn_login', 'conn_password', 'conn_schema', 'conn_port']
 
 
+def _conn_wrapper(args, list=None, delete=None, add=None):
+    args.list = list
+    args.delete = delete
+    args.add = add
+    connections(args)
+
+
+def connections_list(args):
+    _conn_wrapper(args, list=True)
+
+
+def connections_add(args):
+    _conn_wrapper(args, add=True)
+
+
+def connections_delete(args):
+    _conn_wrapper(args, delete=True)
+
+
+@cli_utils.deprecated_action(sub_commands=True)
 @cli_utils.action_logging
 def connections(args):
     if args.list:
@@ -1655,6 +1772,7 @@ def connections(args):
         return
 
 
+@cli_utils.deprecated_action(new_name='celery flower')
 @cli_utils.action_logging
 def flower(args):
     broka = conf.get('celery', 'BROKER_URL')
@@ -1734,6 +1852,7 @@ def kerberos(args):  # noqa
         airflow.security.kerberos.run(principal=args.principal, keytab=args.keytab)
 
 
+@cli_utils.deprecated_action(new_name='users create')
 @cli_utils.action_logging
 def create_user(args):
     fields = {
@@ -1774,6 +1893,7 @@ def create_user(args):
         raise SystemExit('Failed to create user.')
 
 
+@cli_utils.deprecated_action(new_name='users delete')
 @cli_utils.action_logging
 def delete_user(args):
     if not args.username:
@@ -1792,6 +1912,7 @@ def delete_user(args):
         raise SystemExit('Failed to delete user.')
 
 
+@cli_utils.deprecated_action(new_name='users list')
 @cli_utils.action_logging
 def list_users(args):
     appbuilder = cached_appbuilder()
@@ -1805,6 +1926,7 @@ def list_users(args):
     print(msg)
 
 
+@cli_utils.deprecated_action(new_name='dags list-runs')
 @cli_utils.action_logging
 def list_dag_runs(args, dag=None):
     if dag:
@@ -1857,6 +1979,7 @@ def list_dag_runs(args, dag=None):
         print(record)
 
 
+@cli_utils.deprecated_action(new_name='sync-perm')
 @cli_utils.action_logging
 def sync_perm(args): # noqa
     if settings.RBAC:
@@ -1874,6 +1997,7 @@ def sync_perm(args): # noqa
         print('The sync_perm command only works for rbac UI.')
 
 
+@cli_utils.deprecated_action(new_name='config list')
 def config(args):
     """Show current application configuration"""
     with io.StringIO() as output:
@@ -2282,17 +2406,935 @@ Please install apache-airflow-upgrade-check distribution from PyPI to perform up
 """)
 
 
-class Arg(object):
-    def __init__(self, flags=None, help=None, action=None, default=None, nargs=None,
-                 type=None, choices=None, metavar=None):
+# Used in Arg to enable `None' as a distinct value from "not passed"
+_UNSET = object()
+
+
+class Arg:
+    """Class to keep information about command line argument"""
+
+    # pylint: disable=redefined-builtin,unused-argument
+    def __init__(
+        self,
+        flags=_UNSET,
+        help=_UNSET,
+        action=_UNSET,
+        default=_UNSET,
+        nargs=_UNSET,
+        type=_UNSET,
+        choices=_UNSET,
+        required=_UNSET,
+        metavar=_UNSET,
+    ):
         self.flags = flags
-        self.help = help
-        self.action = action
-        self.default = default
-        self.nargs = nargs
-        self.type = type
-        self.choices = choices
-        self.metavar = metavar
+        self.kwargs = {}
+        for k, v in locals().items():
+            if v is _UNSET:
+                continue
+            if k in ("self", "flags"):
+                continue
+
+            self.kwargs[k] = v
+
+    # pylint: enable=redefined-builtin,unused-argument
+
+    def add_to_parser(self, parser):
+        """Add this argument to an ArgumentParser"""
+        parser.add_argument(*self.flags, **self.kwargs)
+
+
+def positive_int(value):
+    """Define a positive int type for an argument."""
+    try:
+        value = int(value)
+        if value > 0:
+            return value
+    except ValueError:
+        pass
+    raise argparse.ArgumentTypeError("invalid positive int value: '{}'".format(value))
+
+
+# Shared
+ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag")
+ARG_TASK_ID = Arg(("task_id",), help="The id of the task")
+ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the DAG", type=parsedate)
+ARG_TASK_REGEX = Arg(
+    ("-t", "--task-regex"), help="The regex to filter specific task_ids to backfill (optional)"
+)
+ARG_SUBDIR = Arg(
+    ("-S", "--subdir"),
+    help=(
+        "File location or directory from which to look for the dag. "
+        "Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the "
+        "value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg' "
+    ),
+    default='[AIRFLOW_HOME]/dags' if BUILD_DOCS else settings.DAGS_FOLDER,
+)
+ARG_START_DATE = Arg(("-s", "--start-date"), help="Override start_date YYYY-MM-DD", type=parsedate)
+ARG_END_DATE = Arg(("-e", "--end-date"), help="Override end_date YYYY-MM-DD", type=parsedate)
+ARG_OUTPUT_PATH = Arg(
+    (
+        "-o",
+        "--output-path",
+    ),
+    help="The output for generated yaml files",
+    type=str,
+    default="[CWD]" if BUILD_DOCS else os.getcwd(),
+)
+ARG_DRY_RUN = Arg(
+    ("-n", "--dry-run"),
+    help="Perform a dry run for each task. Only renders Template Fields for each task, nothing else",
+    action="store_true",
+)
+ARG_PID = Arg(("--pid",), help="PID file location", nargs='?')
+ARG_DAEMON = Arg(
+    ("-D", "--daemon"), help="Daemonize instead of running in the foreground", action="store_true"
+)
+ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file")
+ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file")
+ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file")
+ARG_YES = Arg(
+    ("-y", "--yes"), help="Do not prompt to confirm reset. Use with care!", action="store_true", default=False
+)
+
+# list_dag_runs
+ARG_DAG_ID_OPT = Arg(("-d", "--dag-id"), help="The id of the dag")
+ARG_NO_BACKFILL = Arg(
+    ("--no-backfill",), help="filter all the backfill dagruns given the dag id", action="store_true"
+)
+ARG_STATE = Arg(("--state",), help="Only list the dag runs corresponding to the state")
+
+# backfill
+ARG_MARK_SUCCESS = Arg(
+    ("-m", "--mark-success"), help="Mark jobs as succeeded without running them", action="store_true"
+)
+ARG_LOCAL = Arg(("-l", "--local"), help="Run the task using the LocalExecutor", action="store_true")
+ARG_POOL = Arg(("--pool",), "Resource pool to use")
+
+# list_tasks
+ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true")
+
+# clear
+ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", action="store_true")
+ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", action="store_true")
+ARG_ONLY_RUNNING = Arg(("-r", "--only-running"), help="Only running jobs", action="store_true")
+ARG_DOWNSTREAM = Arg(("-d", "--downstream"), help="Include downstream tasks", action="store_true")
+ARG_EXCLUDE_SUBDAGS = Arg(("-x", "--exclude-subdags"), help="Exclude subdags", action="store_true")
+ARG_EXCLUDE_PARENTDAG = Arg(
+    ("-X", "--exclude-parentdag"),
+    help="Exclude ParentDAGS if the task cleared is a part of a SubDAG",
+    action="store_true",
+)
+ARG_DAG_REGEX = Arg(
+    ("-R", "--dag-regex"), help="Search dag_id as regex instead of exact string", action="store_true"
+)
+
+# show_dag
+ARG_SAVE = Arg(("-s", "--save"), help="Saves the result to the indicated file.")
+
+ARG_IMGCAT = Arg(("--imgcat",), help="Displays graph using the imgcat tool.", action='store_true')
+
+# trigger_dag
+ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
+ARG_CONF = Arg(('-c', '--conf'), help="JSON string that gets pickled into the DagRun's conf attribute")
+ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the DAG", type=parsedate)
+
+# pool
+ARG_POOL_NAME = Arg(("pool",), metavar='NAME', help="Pool name")
+ARG_POOL_SLOTS = Arg(("slots",), type=int, help="Pool slots")
+ARG_POOL_DESCRIPTION = Arg(("description",), help="Pool description")
+ARG_POOL_IMPORT = Arg(("file",), metavar="FILEPATH", help="Import pools from JSON file")
+ARG_POOL_EXPORT = Arg(("file",), metavar="FILEPATH", help="Export all pools to JSON file")
+
+# variables
+ARG_VAR = Arg(("key",), help="Variable key")
+ARG_VAR_VALUE = Arg(("value",), metavar='VALUE', help="Variable value")
+ARG_DEFAULT = Arg(
+    ("-d", "--default"), metavar="VAL", default=None, help="Default value returned if variable does not exist"
+)
+ARG_JSON = Arg(("-j", "--json"), help="Deserialize JSON variable", action="store_true")
+ARG_VAR_IMPORT = Arg(("file",), help="Import variables from JSON file")
+ARG_VAR_EXPORT = Arg(("file",), help="Export all variables to JSON file")
+
+# kerberos
+ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs='?')
+ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs='?', default=conf.get('kerberos', 'keytab'))
+# run
+# TODO(aoen): "force" is a poor choice of name here since it implies it overrides
+# all dependencies (not just past success), e.g. the ignore_depends_on_past
+# dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and
+# the "ignore_all_dependencies" command should be called the"force" command
+# instead.
+ARG_INTERACTIVE = Arg(
+    ('-N', '--interactive'),
+    help='Do not capture standard output and error streams (useful for interactive debugging)',
+    action='store_true',
+)
+ARG_FORCE = Arg(
+    ("-f", "--force"),
+    help="Ignore previous task instance state, rerun regardless if task already succeeded/failed",
+    action="store_true",
+)
+ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
+ARG_IGNORE_ALL_DEPENDENCIES = Arg(
+    ("-A", "--ignore-all-dependencies"),
+    help="Ignores all non-critical dependencies, including ignore_ti_state and ignore_task_deps",
+    action="store_true",
+)
+# TODO(aoen): ignore_dependencies is a poor choice of name here because it is too
+# vague (e.g. a task being in the appropriate state to be run is also a dependency
+# but is not ignored by this flag), the name 'ignore_task_dependencies' is
+# slightly better (as it ignores all dependencies that are specific to the task),
+# so deprecate the old command name and use this instead.
+ARG_IGNORE_DEPENDENCIES = Arg(
+    ("-i", "--ignore-dependencies"),
+    help="Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies",
+    action="store_true",
+)
+ARG_IGNORE_DEPENDS_ON_PAST = Arg(
+    ("-I", "--ignore-depends-on-past"),
+    help="Ignore depends_on_past dependencies (but respect upstream dependencies)",
+    action="store_true",
+)
+ARG_SHIP_DAG = Arg(
+    ("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true"
+)
+ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)")
+ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
+ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
+
+# worker
+ARG_QUEUES = Arg(
+    ("-q", "--queues"),
+    help="Comma delimited list of queues to serve",
+    default=conf.get('celery', 'DEFAULT_QUEUE'),
+)
+ARG_CONCURRENCY = Arg(
+    ("-c", "--concurrency"),
+    type=int,
+    help="The number of worker processes",
+    default=conf.get('celery', 'worker_concurrency'),
+)
+ARG_CELERY_HOSTNAME = Arg(
+    ("-H", "--celery-hostname"),
+    help="Set the hostname of celery worker if you have multiple workers on a single machine",
+)
+
+# flower
+ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
+ARG_FLOWER_HOSTNAME = Arg(
+    ("-H", "--hostname"),
+    default=conf.get('celery', 'FLOWER_HOST'),
+    help="Set the hostname on which to run the server",
+)
+ARG_FLOWER_PORT = Arg(
+    ("-p", "--port"),
+    default=conf.get('celery', 'FLOWER_PORT'),
+    type=int,
+    help="The port on which to run the server",
+)
+ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower")
+ARG_FLOWER_URL_PREFIX = Arg(
+    ("-u", "--url-prefix"), default=conf.get('celery', 'FLOWER_URL_PREFIX'), help="URL prefix for Flower"
+)
+ARG_FLOWER_BASIC_AUTH = Arg(
+    ("-A", "--basic-auth"),
+    default=conf.get('celery', 'FLOWER_BASIC_AUTH'),
+    help=(
+        "Securing Flower with Basic Authentication. "
+        "Accepts user:password pairs separated by a comma. "
+        "Example: flower_basic_auth = user1:password1,user2:password2"
+    ),
+)
+ARG_TASK_PARAMS = Arg(("-t", "--task-params"), help="Sends a JSON params dict to the task")
+ARG_POST_MORTEM = Arg(
+    ("-m", "--post-mortem"), action="store_true", help="Open debugger on uncaught exception"
+)
+
+# connections
+ARG_CONN_ID = Arg(('conn_id',), help='Connection id, required to get/add/delete a connection', type=str)
+ARG_CONN_URI = Arg(
+    ('--conn-uri',), help='Connection URI, required to add a connection without conn_type', type=str
+)
+ARG_CONN_TYPE = Arg(
+    ('--conn-type',), help='Connection type, required to add a connection without conn_uri', type=str
+)
+ARG_CONN_HOST = Arg(('--conn-host',), help='Connection host, optional when adding a connection', type=str)
+ARG_CONN_LOGIN = Arg(('--conn-login',), help='Connection login, optional when adding a connection', type=str)
+ARG_CONN_PASSWORD = Arg(
+    ('--conn-password',), help='Connection password, optional when adding a connection', type=str
+)
+ARG_CONN_SCHEMA = Arg(
+    ('--conn-schema',), help='Connection schema, optional when adding a connection', type=str
+)
+ARG_CONN_PORT = Arg(('--conn-port',), help='Connection port, optional when adding a connection', type=str)
+ARG_CONN_EXTRA = Arg(
+    ('--conn-extra',), help='Connection `Extra` field, optional when adding a connection', type=str
+)
+
+# users
+ARG_USERNAME = Arg(('-u', '--username'), help='Username of the user', required=True, type=str)
+ARG_FIRSTNAME = Arg(('-f', '--firstname'), help='First name of the user', required=True, type=str)
+ARG_LASTNAME = Arg(('-l', '--lastname'), help='Last name of the user', required=True, type=str)
+ARG_ROLE = Arg(
+    ('-r', '--role'),
+    help='Role of the user. Existing roles include Admin, User, Op, Viewer, and Public',
+    required=True,
+    type=str,
+)
+ARG_EMAIL = Arg(('-e', '--email'), help='Email of the user', required=True, type=str)
+ARG_PASSWORD = Arg(
+    ('-p', '--password'),
+    help='Password of the user, required to create a user without --use-random-password',
+    type=str,
+)
+ARG_USE_RANDOM_PASSWORD = Arg(
+    ('--use-random-password',),
+    help='Do not prompt for password. Use random string instead.'
+    ' Required to create a user without --password ',
+    default=False,
+    action='store_true',
+)
+
+# roles
+ARG_AUTOSCALE = Arg(('-a', '--autoscale'), help="Minimum and Maximum number of worker to autoscale")
+ARG_SKIP_SERVE_LOGS = Arg(
+    ("-s", "--skip-serve-logs"),
+    default=False,
+    help="Don't start the serve logs process along with the workers",
+    action="store_true",
+)
+
+ALTERNATIVE_CONN_SPECS_ARGS = [
+    ARG_CONN_TYPE,
+    ARG_CONN_HOST,
+    ARG_CONN_LOGIN,
+    ARG_CONN_PASSWORD,
+    ARG_CONN_SCHEMA,
+    ARG_CONN_PORT,
+]
+
+# A special "argument" (that is hidden from help) that sets `args.deprecation_warning=False` in the resulting
+# Namespace. Add this to any commands that use the same implementation function in new and old names to
+# supresses the warning for the new form.
+NOT_DEPRECATED = Arg(("--deprecation_warning",), help=argparse.SUPPRESS, default=False, required=False)
+
+_ActionCommand = namedtuple('ActionCommand', ['name', 'help', 'func', 'args', 'description', 'epilog',
+                                              'prog'])
+_GroupCommand = namedtuple('GroupCommand', ['name', 'help', 'subcommands', 'description', 'epilog'])
+
+_ActionCommand.__new__.__defaults__ = (None,) * len(_ActionCommand._fields)  # type: ignore
+_GroupCommand.__new__.__defaults__ = (None,) * len(_GroupCommand._fields)  # type: ignore
+
+ActionCommand = cast(Any, _ActionCommand)
+GroupCommand = cast(Any, _GroupCommand)
+
+
+DAGS_COMMANDS = (
+    ActionCommand(
+        name='list',
+        help="List all the DAGs",
+        func=list_dags,
+        args=(ARG_SUBDIR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='report',
+        help='Show DagBag loading report',
+        func=list_dags_report,
+        args=(ARG_SUBDIR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='list-runs',
+        help="List DAG runs given a DAG id",
+        description=(
+            "List DAG runs given a DAG id. If state option is given, it will only search for all the "
+            "dagruns with the given state. If no_backfill option is given, it will filter out all "
+            "backfill dagruns for given dag id. If start_date is given, it will filter out all the "
+            "dagruns that were executed before this date. If end_date is given, it will filter out "
+            "all the dagruns that were executed after this date. "
+        ),
+        func=list_dag_runs,
+        args=(ARG_DAG_ID_OPT, ARG_NO_BACKFILL, ARG_STATE, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='state',
+        help="Get the status of a dag run",
+        func=dag_state,
+        args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='next-execution',
+        help="Get the next execution datetimes of a DAG",
+        description=(
+            "Get the next execution datetimes of a DAG. It returns one execution unless the "
+            "num-executions option is given"
+        ),
+        func=next_execution,
+        args=(ARG_DAG_ID, ARG_SUBDIR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='pause',
+        help='Pause a DAG',
+        func=pause,
+        args=(ARG_DAG_ID, ARG_SUBDIR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='unpause',
+        help='Resume a paused DAG',
+        func=unpause,
+        args=(ARG_DAG_ID, ARG_SUBDIR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='trigger',
+        help='Trigger a DAG run',
+        func=trigger_dag,
+        args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='delete',
+        help="Delete all DB records related to the specified DAG",
+        func=delete_dag,
+        args=(ARG_DAG_ID, ARG_YES, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='show',
+        help="Displays DAG's tasks with their dependencies",
+        description=(
+            "The --imgcat option only works in iTerm.\n"
+            "\n"
+            "For more information, see: https://www.iterm2.com/documentation-images.html\n"
+            "\n"
+            "The --save option saves the result to the indicated file.\n"
+            "\n"
+            "The file format is determined by the file extension. "
+            "For more information about supported "
+            "format, see: https://www.graphviz.org/doc/info/output.html\n"
+            "\n"
+            "If you want to create a PNG file then you should execute the following command:\n"
+            "airflow dags show <DAG_ID> --save output.png\n"
+            "\n"
+            "If you want to create a DOT file then you should execute the following command:\n"
+            "airflow dags show <DAG_ID> --save output.dot\n"
+        ),
+        func=show_dag,
+        args=(
+            ARG_DAG_ID,
+            ARG_SUBDIR,
+            ARG_SAVE,
+            ARG_IMGCAT,
+            NOT_DEPRECATED,
+        ),
+    ),
+)
+TASKS_COMMANDS = (
+    ActionCommand(
+        name='list',
+        help="List the tasks within a DAG",
+        func=list_tasks,
+        args=(ARG_DAG_ID, ARG_TREE, ARG_SUBDIR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='clear',
+        help="Clear a set of task instance, as if they never ran",
+        func=clear,
+        args=(
+            ARG_DAG_ID,
+            ARG_TASK_REGEX,
+            ARG_START_DATE,
+            ARG_END_DATE,
+            ARG_SUBDIR,
+            ARG_UPSTREAM,
+            ARG_DOWNSTREAM,
+            ARG_YES,
+            ARG_ONLY_FAILED,
+            ARG_ONLY_RUNNING,
+            ARG_EXCLUDE_SUBDAGS,
+            ARG_EXCLUDE_PARENTDAG,
+            ARG_DAG_REGEX,
+            NOT_DEPRECATED,
+        ),
+    ),
+    ActionCommand(
+        name='state',
+        help="Get the status of a task instance",
+        func=task_state,
+        args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='failed-deps',
+        help="Returns the unmet dependencies for a task instance",
+        description=(
+            "Returns the unmet dependencies for a task instance from the perspective of the scheduler. "
+            "In other words, why a task instance doesn't get scheduled and then queued by the scheduler, "
+            "and then run by an executor."
+        ),
+        func=task_failed_deps,
+        args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='render',
+        help="Render a task instance's template(s)",
+        func=render,
+        args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='run',
+        help="Run a single task instance",
+        func=run,
+        args=(
+            ARG_DAG_ID,
+            ARG_TASK_ID,
+            ARG_EXECUTION_DATE,
+            ARG_SUBDIR,
+            ARG_MARK_SUCCESS,
+            ARG_FORCE,
+            ARG_POOL,
+            ARG_CFG_PATH,
+            ARG_LOCAL,
+            ARG_RAW,
+            ARG_IGNORE_ALL_DEPENDENCIES,
+            ARG_IGNORE_DEPENDENCIES,
+            ARG_IGNORE_DEPENDS_ON_PAST,
+            ARG_SHIP_DAG,
+            ARG_PICKLE,
+            ARG_JOB_ID,
+            ARG_INTERACTIVE,
+            NOT_DEPRECATED,
+        ),
+    ),
+    ActionCommand(
+        name='test',
+        help="Test a task instance",
+        description=(
+            "Test a task instance. This will run a task without checking for dependencies or recording "
+            "its state in the database"
+        ),
+        func=test,
+        args=(
+            ARG_DAG_ID,
+            ARG_TASK_ID,
+            ARG_EXECUTION_DATE,
+            ARG_SUBDIR,
+            ARG_DRY_RUN,
+            ARG_TASK_PARAMS,
+            ARG_POST_MORTEM,
+            NOT_DEPRECATED,
+        ),
+    ),
+)
+POOLS_COMMANDS = (
+    ActionCommand(
+        name='list',
+        help='List pools',
+        func=pool_list,
+        args=(NOT_DEPRECATED,),
+    ),
+    ActionCommand(
+        name='get',
+        help='Get pool size',
+        func=pool_get,
+        args=(
+            ARG_POOL_NAME,
+            NOT_DEPRECATED,
+        ),
+    ),
+    ActionCommand(
+        name='set',
+        help='Configure pool',
+        func=pool_set,
+        args=(
+            ARG_POOL_NAME,
+            ARG_POOL_SLOTS,
+            ARG_POOL_DESCRIPTION,
+            NOT_DEPRECATED,
+        ),
+    ),
+    ActionCommand(
+        name='delete',
+        help='Delete pool',
+        func=pool_delete,
+        args=(
+            ARG_POOL_NAME,
+            NOT_DEPRECATED,
+        ),
+    ),
+    ActionCommand(
+        name='import',
+        help='Import pools',
+        func=pool_import,
+        args=(
+            ARG_POOL_IMPORT,
+            NOT_DEPRECATED,
+        ),
+    ),
+    ActionCommand(
+        name='export',
+        help='Export all pools',
+        func=pool_import,
+        args=(
+            ARG_POOL_EXPORT,
+            NOT_DEPRECATED,
+        ),
+    ),
+)
+VARIABLES_COMMANDS = (
+    ActionCommand(
+        name='get',
+        help='Get variable',
+        func=variables_get,
+        args=(ARG_VAR, ARG_JSON, ARG_DEFAULT, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='set',
+        help='Set variable',
+        func=variables_set,
+        args=(ARG_VAR, ARG_VAR_VALUE, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='delete',
+        help='Delete variable',
+        func=variables_delete,
+        args=(ARG_VAR, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='import',
+        help='Import variables',
+        func=variables_import,
+        args=(ARG_VAR_IMPORT, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='export',
+        help='Export all variables',
+        func=variables_export,
+        args=(ARG_VAR_EXPORT, NOT_DEPRECATED),
+    ),
+)
+DB_COMMANDS = (
+    ActionCommand(
+        name='init',
+        help="Initialize the metadata database",
+        func=initdb,
+        args=(NOT_DEPRECATED,),
+    ),
+    ActionCommand(
+        name='reset',
+        help="Burn down and rebuild the metadata database",
+        func=resetdb,
+        args=(ARG_YES, NOT_DEPRECATED),
+    ),
+    ActionCommand(
+        name='upgrade',
+        help="Upgrade the metadata database to latest version",
+        func=upgrade_check,
+        args=(NOT_DEPRECATED,),
+    ),
+    ActionCommand(
+        name='shell',
+        help="Runs a shell to access the database",
+        func=shell,
+        args=(NOT_DEPRECATED,),
+    ),
+    ActionCommand(
+        name='check',
+        help="Check if the database can be reached",
+        func=checkdb,
+        args=(NOT_DEPRECATED,),
+    ),
+)
+CONNECTIONS_COMMANDS = (
+    ActionCommand(
+        name='list',
+        help='List connections',
+        func=connections_list,
+        args=(NOT_DEPRECATED,),
+    ),
+    ActionCommand(
+        name='add',
+        help='Add a connection',
+        func=connections_add,
+        args=(ARG_CONN_ID, ARG_CONN_URI, ARG_CONN_EXTRA, NOT_DEPRECATED) + tuple(ALTERNATIVE_CONN_SPECS_ARGS),
+    ),
+    ActionCommand(
+        name='delete',
+        help='Delete a connection',
+        func=connections_delete,
+        args=(ARG_CONN_ID, NOT_DEPRECATED),
+    ),
+)
+
+USERS_COMMANDS = (
+    ActionCommand(
+        name='list',
+        help='List users',
+        func=list_users,
+        args=(NOT_DEPRECATED,),
+    ),
+    ActionCommand(
+        name='create',
+        help='Create a user',
+        func=create_user,
+        args=(
+            ARG_ROLE,
+            ARG_USERNAME,
+            ARG_EMAIL,
+            ARG_FIRSTNAME,
+            ARG_LASTNAME,
+            ARG_PASSWORD,
+            ARG_USE_RANDOM_PASSWORD,
+            NOT_DEPRECATED,
+        ),
+        epilog=(
+            'examples:\n'
+            'To create an user with "Admin" role and username equals to "admin", run:\n'
+            '\n'
+            '    $ airflow users create \\\n'
+            '          --username admin \\\n'
+            '          --firstname FIRST_NAME \\\n'
+            '          --lastname LAST_NAME \\\n'
+            '          --role Admin \\\n'
+            '          --email admin@example.org'
+        ),
+    ),
+    ActionCommand(
+        name='delete',
+        help='Delete a user',
+        func=delete_user,
+        args=(ARG_USERNAME, NOT_DEPRECATED),
+    ),
+)
+
+CELERY_COMMANDS = (
+    ActionCommand(
+        name='worker',
+        help="Start a Celery worker node",
+        func=worker,
+        args=(
+            ARG_QUEUES,
+            ARG_CONCURRENCY,
+            ARG_CELERY_HOSTNAME,
+            ARG_PID,
+            ARG_DAEMON,
+            ARG_STDOUT,
+            ARG_STDERR,
+            ARG_LOG_FILE,
+            ARG_AUTOSCALE,
+            ARG_SKIP_SERVE_LOGS,
+            NOT_DEPRECATED,
+        ),
+    ),
+    ActionCommand(
+        name='flower',
+        help="Start a Celery Flower",
+        func=flower,
+        args=(
+            ARG_FLOWER_HOSTNAME,
+            ARG_FLOWER_PORT,
+            ARG_FLOWER_CONF,
+            ARG_FLOWER_URL_PREFIX,
+            ARG_FLOWER_BASIC_AUTH,
+            ARG_BROKER_API,
+            ARG_PID,
+            ARG_DAEMON,
+            ARG_STDOUT,
+            ARG_STDERR,
+            ARG_LOG_FILE,
+            NOT_DEPRECATED,
+        ),
+    ),
+)
+
+CONFIG_COMMANDS = (
+    ActionCommand(
+        name='list',
+        help='List options for the configuration',
+        func=config,
+        args=(NOT_DEPRECATED,),
+    ),
+)
+
+KUBERNETES_COMMANDS = (
+    ActionCommand(
+        name='generate-dag-yaml',
+        help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without "
+        "launching into a cluster",
+        func=kubernetes_generate_dag_yaml,
+        args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH, NOT_DEPRECATED),
+    ),
+)
+
+airflow_commands = [
+    GroupCommand(
+        name='dags',
+        help='Manage DAGs',
+        subcommands=DAGS_COMMANDS,
+    ),
+    GroupCommand(
+        name="kubernetes", help='Tools to help run the KubernetesExecutor', subcommands=KUBERNETES_COMMANDS
+    ),
+    GroupCommand(
+        name='tasks',
+        help='Manage tasks',
+        subcommands=TASKS_COMMANDS,
+    ),
+    GroupCommand(
+        name='pools',
+        help="Manage pools",
+        subcommands=POOLS_COMMANDS,
+    ),
+    GroupCommand(
+        name='variables',
+        help="Manage variables",
+        subcommands=VARIABLES_COMMANDS,
+    ),
+    GroupCommand(
+        name='db',
+        help="Database operations",
+        subcommands=DB_COMMANDS,
+    ),
+    ActionCommand(
+        name='kerberos',
+        help="Start a kerberos ticket renewer",
+        func=kerberos,
+        args=(ARG_PRINCIPAL, ARG_KEYTAB, ARG_PID, ARG_DAEMON, ARG_STDOUT, ARG_STDERR, ARG_LOG_FILE),
+    ),
+    GroupCommand(
+        name='connections',
+        help="Manage connections",
+        subcommands=CONNECTIONS_COMMANDS,
+    ),
+    GroupCommand(
+        name='users',
+        help="Manage users",
+        subcommands=USERS_COMMANDS,
+    ),
+    ActionCommand(
+        name='sync-perm',
+        help="Update permissions for existing roles and DAGs",
+        func=sync_perm,
+        args=(NOT_DEPRECATED,),
+    ),
+    ActionCommand(
+        name='rotate-fernet-key',
+        func=rotate_fernet_key,
+        help='Rotate encrypted connection credentials and variables',
+        description=(
+            'Rotate all encrypted connection credentials and variables; see '
+            'https://airflow.apache.org/docs/stable/howto/secure-connections.html'
+            '#rotating-encryption-keys'
+        ),
+        args=(NOT_DEPRECATED,),
+    ),
+    GroupCommand(name="config", help='View configuration', subcommands=CONFIG_COMMANDS),
+    GroupCommand(
+        name="celery",
+        help='Celery components',
+        description=(
+            'Start celery components. Works only when using CeleryExecutor. For more information, see '
+            'https://airflow.apache.org/docs/stable/executor/celery.html'
+        ),
+        subcommands=CELERY_COMMANDS,
+    ),
+]
+ALL_COMMANDS_DICT = {sp.name: sp for sp in airflow_commands}
+DAG_CLI_COMMANDS = {'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause', 'list_dag_runs'}
+
+
+class AirflowHelpFormatter(argparse.HelpFormatter):
+    """
+    Custom help formatter to display help message.
+
+    It displays simple commands and groups of commands in separate sections.
+    """
+
+    def _format_action(self, action):
+        if isinstance(action, argparse._SubParsersAction):  # pylint: disable=protected-access
+
+            parts = []
+            action_header = self._format_action_invocation(action)
+            action_header = '%*s%s\n' % (self._current_indent, '', action_header)
+            parts.append(action_header)
+
+            self._indent()
+            subactions = action._get_subactions()  # pylint: disable=protected-access
+
+            action_subcommands, group_subcommands = partition(
+                lambda d: isinstance(ALL_COMMANDS_DICT.get(d.dest, None), GroupCommand), subactions
+            )
+            # Remove deprecated groups from the list -- we don't want to show them
+            parts.append("\n")
+            parts.append('%*s%s:\n' % (self._current_indent, '', "Groups"))
+            self._indent()
+            for subaction in group_subcommands:
+                parts.append(self._format_action(subaction))
+            self._dedent()
+
+            parts.append("\n")
+            parts.append('%*s%s:\n' % (self._current_indent, '', "Commands"))
+            self._indent()
+
+            for subaction in action_subcommands:
+                if getattr(action.choices[subaction.dest], 'hide_from_toplevel_help', False):
+                    continue
+                parts.append(self._format_action(subaction))
+            self._dedent()
+            self._dedent()
+
+            # return a single string
+            return self._join_parts(parts)
+
+        return super(AirflowHelpFormatter, self)._format_action(action)
+
+
+def partition(pred, iterable):
+    iter_1, iter_2 = itertools.tee(iterable)
+    return itertools.filterfalse(pred, iter_1), filter(pred, iter_2)
+
+
+def _sort_args(args):
+    """Sort subcommand optional args, keep positional args"""
+
+    def get_long_option(arg):
+        """Get long option from Arg.flags"""
+        return arg.flags[0] if len(arg.flags) == 1 else arg.flags[1]
+
+    positional, optional = partition(lambda x: x.flags[0].startswith("-"), args)
+    for p in positional:
+        yield p
+    for o in sorted(optional, key=lambda x: get_long_option(x).lower()):
+        yield o
+
+
+def _add_command(subparsers, sub):
+
+    sub_proc = subparsers.add_parser(
+        sub.name, help=sub.help, description=sub.description or sub.help, epilog=sub.epilog,
+    )
+    sub_proc.formatter_class = argparse.RawTextHelpFormatter
+
+    if isinstance(sub, GroupCommand):
+        return _add_group_command(sub, sub_proc)
+    elif isinstance(sub, ActionCommand):
+        if sub.prog:
+            sub_proc.prog = sub.prog
+        return _add_action_command(sub, sub_proc)
+    else:
+        raise AirflowException("Invalid command definition.")
+
+
+def _add_action_command(sub, sub_proc):
+    for arg in _sort_args(sub.args):
+        arg.add_to_parser(sub_proc)
+    sub_proc.set_defaults(func=sub.func)
+
+
+def _add_group_command(sub, sub_proc):
+    subcommands = sub.subcommands
+    sub_subparsers = sub_proc.add_subparsers(dest="subcommand", metavar="COMMAND")
+    sub_subparsers.required = True
+
+    for command in sorted(subcommands, key=lambda x: x.name):
+        _add_command(sub_subparsers, command)
+    return sub_proc, sub_subparsers
 
 
 class CLIFactory(object):
@@ -2818,6 +3860,34 @@ class CLIFactory(object):
                 'reset_dag_run', 'rerun_failed_tasks', 'run_backwards'
             )
         }, {
+            'func': generate_pod_template,
+            'help': "Reads your airflow.cfg and migrates your configurations into a "
+                    "airflow_template.yaml file. From this point a user can link"
+                    "this file to airflow using the `pod_template_file` argument"
+                    "and modify using the Kubernetes API",
+            'args': ('output_path',),
+        }, {
+            'func': serve_logs,
+            'help': "Serve logs generate by worker",
+            'args': tuple(),
+        }, {
+            'func': webserver,
+            'help': "Start a Airflow webserver instance",
+            'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname',
+                     'pid', 'daemon', 'stdout', 'stderr', 'access_logfile',
+                     'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 'debug'),
+        }, {
+            'help': 'Show the version',
+            'func': version,
+            'args': tuple(),
+        }, {
+            'help': 'Show information about current Airflow and environment',
+            'func': info,
+            'args': ('anonymize', 'file_io', ),
+        },
+    )
+    deprecated_subparsers = (
+        {
             'func': list_dag_runs,
             'help': "List dag runs given a DAG id. If state option is given, it will only"
                     "search for all the dagruns with the given state. "
@@ -2839,14 +3909,6 @@ class CLIFactory(object):
             'args': (
                 'dag_id', 'output_path', 'subdir', 'execution_date'
             )
-
-        }, {
-            'func': generate_pod_template,
-            'help': "Reads your airflow.cfg and migrates your configurations into a"
-                    "airflow_template.yaml file. From this point a user can link"
-                    "this file to airflow using the `pod_template_file` argument"
-                    "and modify using the Kubernetes API",
-            'args': ('output_path',),
         }, {
             'func': clear,
             'help': "Clear a set of task instance, as if they never ran",
@@ -2884,11 +3946,6 @@ class CLIFactory(object):
             "args": ('set', 'get', 'json', 'default',
                      'var_import', 'var_export', 'var_delete'),
         }, {
-            'func': kerberos,
-            'help': "Start a kerberos ticket renewer",
-            'args': ('principal', 'keytab', 'pid',
-                     'daemon', 'stdout', 'stderr', 'log_file'),
-        }, {
             'func': render,
             'help': "Render a task instance's template(s)",
             'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
@@ -2925,10 +3982,6 @@ class CLIFactory(object):
             'help': "Get the status of a task instance",
             'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
         }, {
-            'func': serve_logs,
-            'help': "Serve logs generate by worker",
-            'args': tuple(),
-        }, {
             'func': test,
             'help': (
                 "Test a task instance. This will run a task without checking for "
@@ -2937,12 +3990,6 @@ class CLIFactory(object):
                 'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run',
                 'task_params', 'post_mortem'),
         }, {
-            'func': webserver,
-            'help': "Start a Airflow webserver instance",
-            'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname',
-                     'pid', 'daemon', 'stdout', 'stderr', 'access_logfile',
-                     'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 'debug'),
-        }, {
             'func': resetdb,
             'help': "Burn down and rebuild the metadata database",
             'args': ('yes',),
@@ -2959,12 +4006,6 @@ class CLIFactory(object):
             'help': "Runs a shell to access the database",
             'args': tuple(),
         }, {
-            'func': scheduler,
-            'help': "Start a scheduler instance",
-            'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
-                     'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
-                     'log_file'),
-        }, {
             'func': worker,
             'help': "Start a Celery worker node",
             'args': ('do_pickle', 'queues', 'concurrency', 'celery_hostname',
@@ -2975,10 +4016,6 @@ class CLIFactory(object):
             'args': ('flower_hostname', 'flower_port', 'flower_conf', 'flower_url_prefix',
                      'flower_basic_auth', 'broker_api', 'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
         }, {
-            'func': version,
-            'help': "Show the version",
-            'args': tuple(),
-        }, {
             'func': connections,
             'help': "List/Add/Delete connections",
             'args': ('list_connections', 'add_connection', 'delete_connection',
@@ -3020,11 +4057,6 @@ class CLIFactory(object):
             'args': ('color', ),
         },
         {
-            'help': 'Show information about current Airflow and environment',
-            'func': info,
-            'args': ('anonymize', 'file_io', ),
-        },
-        {
             'name': 'upgrade_check',
             'help': 'Check if you can safely upgrade to the new version.',
             'func': upgrade_check,
@@ -3032,27 +4064,88 @@ class CLIFactory(object):
             'args': (),
         },
     )
-    subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
-    dag_subparsers = (
+    deprecated_dag_subparsers = (
         'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause', 'list_dag_runs')
 
     @classmethod
     def get_parser(cls, dag_parser=False):
         """Creates and returns command line argument parser"""
+
+        deprecated_subparsers_dict = {sp['func'].__name__: sp for sp in cls.deprecated_subparsers}
+
         class DefaultHelpParser(argparse.ArgumentParser):
             """Override argparse.ArgumentParser.error and use print_help instead of print_usage"""
             def error(self, message):
                 self.print_help()
                 self.exit(2, '\n{} command error: {}, see help above.\n'.format(self.prog, message))
-        parser = DefaultHelpParser()
-        subparsers = parser.add_subparsers(
-            help='sub-command help', dest='subcommand')
+
+            def parse_known_args(self, args, namespace):
+                # Compat hack for optional sub-arguments in Py 2.7
+                fake_opt = getattr(self, "_fake_optional_subparser", False) and \
+                    (args == [] or args[0].startswith('-'))
+                if fake_opt:
+                    args = ["deprecated_"] + args
+
+                args, remain = super(DefaultHelpParser, self).parse_known_args(args, namespace)
+
+                if fake_opt:
+                    # So it doesn't show up as "deprecated_"
+                    args.subcommand = self._fake_optional_subparser
+                return args, remain
+
+        parser = DefaultHelpParser(formatter_class=AirflowHelpFormatter)
+        subparsers = parser.add_subparsers(dest='subcommand', metavar="GROUP_OR_COMMAND")
         subparsers.required = True
 
-        subparser_list = cls.dag_subparsers if dag_parser else cls.subparsers_dict.keys()
-        for sub in subparser_list:
-            sub = cls.subparsers_dict[sub]
+        subparser_list = DAG_CLI_COMMANDS if dag_parser else ALL_COMMANDS_DICT.keys()
+        for sub_name in sorted(subparser_list):
+            action = _add_command(subparsers, ALL_COMMANDS_DICT[sub_name])
+
+            # Deprecated "mode select", and new sub-command version? Merge them
+            # so they both work, but don't show help for the deprecated
+            # options!
+            if sub_name in deprecated_subparsers_dict and action is not None:
+                sp, sub_subparsers = action
+                deprecated = deprecated_subparsers_dict.pop(sub_name)
+                sp.set_defaults(func=deprecated['func'])
+                if six.PY3:
+                    sub_subparsers.required = False
+
+                    for arg in deprecated['args']:
+                        if 'dag_id' in arg and dag_parser:
+                            continue
+                        arg = cls.args[arg]
+                        # Don't show these options in the help output
+                        kwargs = arg.kwargs.copy()
+                        kwargs['help'] = argparse.SUPPRESS
+                        sp.add_argument(*arg.flags, **kwargs)
+                else:
+                    # Py2 doesn't support optional subcommands, so we have to fake it
+                    sp._fake_optional_subparser = sub_name
+                    _add_command(sub_subparsers, ActionCommand(
+                        prog=sp.prog,
+                        name='deprecated_',
+                        help=deprecated['help'],
+                        func=deprecated['func'],
+                        args=(cls.args[arg] for arg in deprecated['args']),
+                    ))
+
+        if dag_parser:
+            subparser_list = [
+                (deprecated_subparsers_dict[name], False)
+                for name in cls.deprecated_dag_subparsers
+            ]
+        else:
+            current = zip(cls.subparsers, itertools.repeat(False))
+            deprecated = zip(deprecated_subparsers_dict.values(), itertools.repeat(True))
+            subparser_list = itertools.chain(current, deprecated)
+        for (sub, hide_from_toplevel_help) in subparser_list:
+            if hide_from_toplevel_help and BUILD_DOCS:
+                # Don't show the deprecated commands in the docs
+                continue
+
             sp = subparsers.add_parser(sub['func'].__name__, help=sub['help'])
+            sp.hide_from_toplevel_help = hide_from_toplevel_help
             sp.set_defaults(func=sub['func'])
             if 'from_module' in sub:
                 try:
@@ -3065,10 +4158,7 @@ class CLIFactory(object):
                 if 'dag_id' in arg and dag_parser:
                     continue
                 arg = cls.args[arg]
-                kwargs = {
-                    f: v
-                    for f, v in vars(arg).items() if f != 'flags' and v}
-                sp.add_argument(*arg.flags, **kwargs)
+                sp.add_argument(*arg.flags, **arg.kwargs)
         return parser
 
 
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 31c8c62..578f07c 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -21,17 +21,18 @@
 Utilities module for cli
 """
 from __future__ import absolute_import
+from __future__ import print_function
 
 import functools
 import getpass
 import json
+import os
 import socket
 import struct
 import sys
 from argparse import Namespace
 from datetime import datetime
 from fcntl import ioctl
-from os import environ
 from termios import TIOCGWINSZ
 
 from airflow.models import Log
@@ -163,7 +164,7 @@ def get_terminal_size(fallback=(80, 20)):
         # as when when stdout is piped to another program
         pass
     try:
-        return int(environ.get('LINES')), int(environ.get('COLUMNS'))
+        return int(os.environ.get('LINES')), int(os.environ.get('COLUMNS'))
     except TypeError:
         return fallback
 
@@ -171,3 +172,63 @@ def get_terminal_size(fallback=(80, 20)):
 def header(text, fillchar):
     rows, columns = get_terminal_size()
     print(" {} ".format(text).center(columns, fillchar))
+
+
+def deprecated_action(func=None, new_name=None, sub_commands=False):
+    if not func:
+        return functools.partial(deprecated_action, new_name=new_name, sub_commands=sub_commands)
+
+    stream = sys.stderr
+    try:
+        from pip._vendor import colorama
+        WINDOWS = (sys.platform.startswith("win") or
+                   (sys.platform == 'cli' and os.name == 'nt'))
+        if WINDOWS:
+            stream = colorama.AnsiToWin32(sys.stderr)
+    except Exception:
+        colorama = None
+
+    def should_color():
+        # Don't colorize things if we do not have colorama or if told not to
+        if not colorama:
+            return False
+
+        real_stream = (
+            stream if not isinstance(stream, colorama.AnsiToWin32)
+            else stream.wrapped
+        )
+
+        # If the stream is a tty we should color it
+        if hasattr(real_stream, "isatty") and real_stream.isatty():
+            return True
+
+        if os.environ.get("TERM") and "color" in os.environ.get("TERM"):
+            return True
+
+        # If anything else we should not color it
+        return False
+
+    @functools.wraps(func)
+    def wrapper(args):
+        if getattr(args, 'deprecation_warning', True):
+            command = args.subcommand or args.func.__name__
+            if sub_commands:
+                msg = (
+                    "The mode (-l, -d, etc) options to {!r} have been deprecated and removed in Airflow 2.0,"
+                    " please use the get/set/list subcommands instead"
+                ).format(command)
+            else:
+                prefix = "The {!r} command is deprecated and removed in Airflow 2.0, please use "
+                if isinstance(new_name, list):
+                    msg = prefix.format(args.subcommand)
+                    new_names = list(map(repr, new_name))
+                    msg += "{}, or {}".format(", ".join(new_names[:-1]), new_names[-1])
+                    msg += " instead"
+                else:
+                    msg = (prefix + "{!r} instead").format(command, new_name)
+
+            if should_color():
+                msg = "".join([colorama.Fore.YELLOW, msg, colorama.Style.RESET_ALL])
+            print(msg, file=sys.stderr)
+        func(args)
+    return wrapper
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 7ef01dc..048f802 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -139,6 +139,8 @@ def create_mock_args(
     args.ignore_dependencies = ignore_dependencies
     args.force = force
     args.interactive = interactive
+    # Needed for CLI deprecation warning decorator
+    args.subcommand = "fake-group"
     return args
 
 
diff --git a/tests/cli/test_worker_initialisation.py b/tests/cli/test_worker_initialisation.py
index 99fee8e..b068830 100644
--- a/tests/cli/test_worker_initialisation.py
+++ b/tests/cli/test_worker_initialisation.py
@@ -28,7 +28,7 @@ from tests.test_utils.config import conf_vars
 
 patch('airflow.utils.cli.action_logging', lambda x: x).start()
 from airflow.bin import cli # noqa
-mock_args = Namespace(queues=1, concurrency=1)
+mock_args = Namespace(queues=1, concurrency=1, subcommand='worker')
 
 
 class TestWorkerPrecheck(unittest.TestCase):