You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/02 17:21:02 UTC
[airflow] branch v1-10-stable updated: Add new-style 2.0 command
names for Airflow 1.10.x (#12725)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-stable by this push:
new 5fbd0ed Add new-style 2.0 command names for Airflow 1.10.x (#12725)
5fbd0ed is described below
commit 5fbd0ed4869c2f0b0fcdc63028ee2109f10a61c7
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Dec 2 17:20:09 2020 +0000
Add new-style 2.0 command names for Airflow 1.10.x (#12725)
This introduces the new style commands for 1.10, but keeps all the
existing commands working, they just don't show in the top level
`airflow --help` anymore.
This was done by taking the argparse groups/sub-commands from master
(along with supporting functions) and copying it here, then providing
needed shims for compat.
I have not added any new sub-commands, nor changed any existing
behaviours, so the 1.10 commands won't change in behaviour.
---
UPDATING.md | 57 ++
airflow/bin/cli.py | 1218 +++++++++++++++++++++++++++++--
airflow/utils/cli.py | 65 +-
tests/cli/test_cli.py | 2 +
tests/cli/test_worker_initialisation.py | 2 +-
5 files changed, 1277 insertions(+), 67 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index c29c8a3..577b644 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -25,6 +25,7 @@ assists users migrating to a new version.
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of contents**
+- [Airflow 1.10.14](#airflow-11014)
- [Airflow 1.10.13](#airflow-11013)
- [Airflow 1.10.12](#airflow-11012)
- [Airflow 1.10.11](#airflow-11011)
@@ -61,6 +62,62 @@ More tips can be found in the guide:
https://developers.google.com/style/inclusive-documentation
-->
+## Airflow 1.10.14
+
+### Airflow CLI changes in line with 2.0
+
+The Airflow CLI has been organized so that related commands are grouped together as subcommands,
+which means that if you use these commands in your scripts, you have to make changes to them.
+
+This section describes the changes that have been made, and what you need to do to update your script.
+
+The ability to manipulate users from the command line has been changed. ``airflow create_user``, ``airflow delete_user``
+ and ``airflow list_users`` has been grouped to a single command `airflow users` with optional flags `create`, `list` and `delete`.
+
+The `airflow list_dags` command is now `airflow dags list`, `airflow pause` is `airflow dags pause`, etc.
+
+In Airflow 1.10 and 2.0 there is an `airflow config` command but there is a difference in behavior. In Airflow 1.10,
+it prints all config options while in Airflow 2.0, it's a command group. `airflow config` is now `airflow config list`.
+You can check other options by running the command `airflow config --help`
+
+Compatibility with the old CLI has been maintained, but they will no longer appear in the help
+
+You can learn about the commands by running ``airflow --help``. For example to get help about the ``celery`` group command,
+you have to run the help command: ``airflow celery --help``.
+
+| Old command | New command | Group |
+|-------------------------------|------------------------------------|--------------------|
+| ``airflow worker`` | ``airflow celery worker`` | ``celery`` |
+| ``airflow flower`` | ``airflow celery flower`` | ``celery`` |
+| ``airflow trigger_dag`` | ``airflow dags trigger`` | ``dags`` |
+| ``airflow delete_dag`` | ``airflow dags delete`` | ``dags`` |
+| ``airflow show_dag`` | ``airflow dags show`` | ``dags`` |
+| ``airflow list_dag`` | ``airflow dags list`` | ``dags`` |
+| ``airflow dag_status`` | ``airflow dags status`` | ``dags`` |
+| ``airflow backfill`` | ``airflow dags backfill`` | ``dags`` |
+| ``airflow list_dag_runs`` | ``airflow dags list-runs`` | ``dags`` |
+| ``airflow pause`` | ``airflow dags pause`` | ``dags`` |
+| ``airflow unpause`` | ``airflow dags unpause`` | ``dags`` |
+| ``airflow next_execution`` | ``airflow dags next-execution`` | ``dags`` |
+| ``airflow test`` | ``airflow tasks test`` | ``tasks`` |
+| ``airflow clear`` | ``airflow tasks clear`` | ``tasks`` |
+| ``airflow list_tasks`` | ``airflow tasks list`` | ``tasks`` |
+| ``airflow task_failed_deps`` | ``airflow tasks failed-deps`` | ``tasks`` |
+| ``airflow task_state`` | ``airflow tasks state`` | ``tasks`` |
+| ``airflow run`` | ``airflow tasks run`` | ``tasks`` |
+| ``airflow render`` | ``airflow tasks render`` | ``tasks`` |
+| ``airflow initdb`` | ``airflow db init`` | ``db`` |
+| ``airflow resetdb`` | ``airflow db reset`` | ``db`` |
+| ``airflow upgradedb`` | ``airflow db upgrade`` | ``db`` |
+| ``airflow checkdb`` | ``airflow db check`` | ``db`` |
+| ``airflow shell`` | ``airflow db shell`` | ``db`` |
+| ``airflow pool`` | ``airflow pools`` | ``pools`` |
+| ``airflow create_user`` | ``airflow users create`` | ``users`` |
+| ``airflow delete_user`` | ``airflow users delete`` | ``users`` |
+| ``airflow list_users`` | ``airflow users list`` | ``users`` |
+| ``airflow rotate_fernet_key`` | ``airflow rotate-fernet-key`` | |
+| ``airflow sync_perm`` | ``airflow sync-perm`` | |
+
## Airflow 1.10.13
### TimeSensor is now timezone aware
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 37f70c4..c22e847 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -22,6 +22,7 @@ from __future__ import print_function
import errno
import hashlib
import importlib
+import itertools
import locale
import logging
@@ -32,7 +33,7 @@ import textwrap
import random
import string
import yaml
-from collections import OrderedDict
+from collections import OrderedDict, namedtuple
from importlib import import_module
import getpass
@@ -59,7 +60,7 @@ import threading
import time
import traceback
-from typing import Any
+from typing import Any, cast
import airflow
from airflow import api
@@ -100,7 +101,9 @@ log = logging.getLogger(__name__)
DAGS_FOLDER = settings.DAGS_FOLDER
-if "BUILDING_AIRFLOW_DOCS" in os.environ:
+BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
+
+if BUILD_DOCS:
DAGS_FOLDER = '[AIRFLOW_HOME]/dags'
@@ -238,6 +241,7 @@ def backfill(args, dag=None):
)
+@cli_utils.deprecated_action(new_name='dags trigger')
@cli_utils.action_logging
def trigger_dag(args):
"""
@@ -256,6 +260,7 @@ def trigger_dag(args):
raise AirflowException(err)
+@cli_utils.deprecated_action(new_name='dags delete')
@cli_utils.action_logging
def delete_dag(args):
"""
@@ -276,6 +281,41 @@ def delete_dag(args):
print("Bail.")
+def _pool_wrapper(args, get=None, set=None, delete=None, export=None, imp=None):
+ args.get = get
+ args.set = set
+ args.delete = delete
+ args.export = export
+ setattr(args, 'import', imp)
+ pool(args)
+
+
+def pool_list(args):
+ _pool_wrapper(args)
+
+
+def pool_get(args):
+ _pool_wrapper(args, get=args.pool)
+
+
+def pool_set(args):
+ _pool_wrapper(args, set=(args.name, args.slots, args.description))
+
+
+def pool_delete(args):
+ _pool_wrapper(args, delete=pool.name)
+
+
+def pool_import(args):
+ _pool_wrapper(args, imp=args.file)
+
+
+def pool_export(args):
+ _pool_wrapper(args, export=args.file)
+
+
+@cli_utils.deprecated_action(new_name=['pools list', 'pools get', 'pools set', 'pools delete', 'pools import',
+ 'pools export'])
@cli_utils.action_logging
def pool(args):
def _tabulate(pools):
@@ -345,6 +385,36 @@ def pool_export_helper(filepath):
return pools
+def _vars_wrapper(args, get=None, set=None, delete=None, export=None, imp=None):
+ args.get = get
+ args.set = set
+ args.delete = delete
+ args.export = export
+ setattr(args, 'import', imp)
+ variables(args)
+
+
+def variables_get(args):
+ _vars_wrapper(args, get=args.key)
+
+
+def variables_delete(args):
+ _vars_wrapper(args, delete=args.key)
+
+
+def variables_set(args):
+ _vars_wrapper(args, set=args.key)
+
+
+def variables_import(args):
+ _vars_wrapper(args, imp=args.file)
+
+
+def variables_export(args):
+ _vars_wrapper(args, export=args.file)
+
+
+@cli_utils.deprecated_action(new_name='variables')
@cli_utils.action_logging
def variables(args):
if args.get:
@@ -417,11 +487,13 @@ def export_helper(filepath):
print("{} variables successfully exported to {}".format(len(var_dict), filepath))
+@cli_utils.deprecated_action(new_name='dags pause')
@cli_utils.action_logging
def pause(args):
set_is_paused(True, args)
+@cli_utils.deprecated_action(new_name='dags unpause')
@cli_utils.action_logging
def unpause(args):
set_is_paused(False, args)
@@ -435,6 +507,7 @@ def set_is_paused(is_paused, args):
print("Dag: {}, paused: {}".format(args.dag_id, str(is_paused)))
+@cli_utils.deprecated_action(new_name='dags show')
def show_dag(args):
dag = get_dag(args)
dot = render_dag(dag)
@@ -513,6 +586,8 @@ def _run(args, dag, ti):
executor.end()
+# Don't warn on deprecation on this one. It is deprecated, but it is used almost exclusively internally, and
+# by not warning we have to make a smaller code change.
@cli_utils.action_logging
def run(args, dag=None):
if dag:
@@ -589,6 +664,7 @@ def run(args, dag=None):
logging.shutdown()
+@cli_utils.deprecated_action(new_name='tasks failed-deps')
@cli_utils.action_logging
def task_failed_deps(args):
"""
@@ -616,6 +692,7 @@ def task_failed_deps(args):
print("Task instance dependencies are all met.")
+@cli_utils.deprecated_action(new_name='tasks state')
@cli_utils.action_logging
def task_state(args):
"""
@@ -629,6 +706,7 @@ def task_state(args):
print(ti.current_state())
+@cli_utils.deprecated_action(new_name='dags state')
@cli_utils.action_logging
def dag_state(args):
"""
@@ -641,6 +719,7 @@ def dag_state(args):
print(dr[0].state if len(dr) > 0 else None)
+@cli_utils.deprecated_action(new_name='dags next-execution')
@cli_utils.action_logging
def next_execution(args):
"""
@@ -666,6 +745,7 @@ def next_execution(args):
print(None)
+@cli_utils.deprecated_action(new_name='rotate-fernet-key')
@cli_utils.action_logging
def rotate_fernet_key(args):
session = settings.Session()
@@ -677,6 +757,7 @@ def rotate_fernet_key(args):
session.commit()
+@cli_utils.deprecated_action(new_name=['dags list', 'dags report'])
@cli_utils.action_logging
def list_dags(args):
dagbag = DagBag(process_subdir(args.subdir))
@@ -688,10 +769,17 @@ def list_dags(args):
""")
dag_list = "\n".join(sorted(dagbag.dags))
print(s.format(dag_list=dag_list))
- if args.report:
+ if getattr(args, 'report', False):
print(dagbag.dagbag_report())
+def list_dags_report(args):
+ args.report = True
+ args.deprecation_warning = False
+ list_dags(args)
+
+
+@cli_utils.deprecated_action(new_name='tasks list')
@cli_utils.action_logging
def list_tasks(args, dag=None):
dag = dag or get_dag(args)
@@ -702,6 +790,7 @@ def list_tasks(args, dag=None):
print("\n".join(sorted(tasks)))
+@cli_utils.deprecated_action(new_name='tasks test')
@cli_utils.action_logging
def test(args, dag=None):
# We want log outout from operators etc to show up here. Normally
@@ -738,6 +827,7 @@ def test(args, dag=None):
logging.getLogger('airflow.task').propagate = False
+@cli_utils.deprecated_action(new_name='tasks render')
@cli_utils.action_logging
def render(args):
dag = get_dag(args)
@@ -753,6 +843,7 @@ def render(args):
""".format(attr, getattr(task, attr))))
+@cli_utils.deprecated_action(new_name='tasks clear')
@cli_utils.action_logging
def clear(args):
logging.basicConfig(
@@ -1249,6 +1340,7 @@ def _serve_logs(env, skip_serve_logs=False):
return None
+@cli_utils.deprecated_action(new_name='kubernetes generate-dag-yaml')
@cli_utils.action_logging
def kubernetes_generate_dag_yaml(args):
from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubeConfig
@@ -1372,6 +1464,7 @@ Happy Airflowing!
print(output_string)
+@cli_utils.deprecated_action(new_name='celery worker')
@cli_utils.action_logging
def worker(args):
env = os.environ.copy()
@@ -1447,6 +1540,7 @@ def worker(args):
sp.kill()
+@cli_utils.deprecated_action(new_name='db init')
def initdb(args): # noqa
py2_deprecation_waring()
print("DB: " + repr(settings.engine.url))
@@ -1454,6 +1548,7 @@ def initdb(args): # noqa
print("Done.")
+@cli_utils.deprecated_action(new_name='db reset')
def resetdb(args):
py2_deprecation_waring()
print("DB: " + repr(settings.engine.url))
@@ -1465,6 +1560,7 @@ def resetdb(args):
print("Bail.")
+@cli_utils.deprecated_action(new_name='db shell')
@cli_utils.action_logging
def shell(args):
"""Run a shell that allows to access database access"""
@@ -1506,6 +1602,7 @@ def upgradedb(args): # noqa
db.upgradedb()
+@cli_utils.deprecated_action(new_name='db check')
@cli_utils.action_logging
def checkdb(args): # noqa
py2_deprecation_waring()
@@ -1522,6 +1619,26 @@ alternative_conn_specs = ['conn_type', 'conn_host',
'conn_login', 'conn_password', 'conn_schema', 'conn_port']
+def _conn_wrapper(args, list=None, delete=None, add=None):
+ args.list = list
+ args.delete = delete
+ args.add = add
+ connections(args)
+
+
+def connections_list(args):
+ _conn_wrapper(args, list=True)
+
+
+def connections_add(args):
+ _conn_wrapper(args, add=True)
+
+
+def connections_delete(args):
+ _conn_wrapper(args, delete=True)
+
+
+@cli_utils.deprecated_action(sub_commands=True)
@cli_utils.action_logging
def connections(args):
if args.list:
@@ -1655,6 +1772,7 @@ def connections(args):
return
+@cli_utils.deprecated_action(new_name='celery flower')
@cli_utils.action_logging
def flower(args):
broka = conf.get('celery', 'BROKER_URL')
@@ -1734,6 +1852,7 @@ def kerberos(args): # noqa
airflow.security.kerberos.run(principal=args.principal, keytab=args.keytab)
+@cli_utils.deprecated_action(new_name='users create')
@cli_utils.action_logging
def create_user(args):
fields = {
@@ -1774,6 +1893,7 @@ def create_user(args):
raise SystemExit('Failed to create user.')
+@cli_utils.deprecated_action(new_name='users delete')
@cli_utils.action_logging
def delete_user(args):
if not args.username:
@@ -1792,6 +1912,7 @@ def delete_user(args):
raise SystemExit('Failed to delete user.')
+@cli_utils.deprecated_action(new_name='users list')
@cli_utils.action_logging
def list_users(args):
appbuilder = cached_appbuilder()
@@ -1805,6 +1926,7 @@ def list_users(args):
print(msg)
+@cli_utils.deprecated_action(new_name='dags list-runs')
@cli_utils.action_logging
def list_dag_runs(args, dag=None):
if dag:
@@ -1857,6 +1979,7 @@ def list_dag_runs(args, dag=None):
print(record)
+@cli_utils.deprecated_action(new_name='sync-perm')
@cli_utils.action_logging
def sync_perm(args): # noqa
if settings.RBAC:
@@ -1874,6 +1997,7 @@ def sync_perm(args): # noqa
print('The sync_perm command only works for rbac UI.')
+@cli_utils.deprecated_action(new_name='config list')
def config(args):
"""Show current application configuration"""
with io.StringIO() as output:
@@ -2282,17 +2406,935 @@ Please install apache-airflow-upgrade-check distribution from PyPI to perform up
""")
-class Arg(object):
- def __init__(self, flags=None, help=None, action=None, default=None, nargs=None,
- type=None, choices=None, metavar=None):
+# Used in Arg to enable `None' as a distinct value from "not passed"
+_UNSET = object()
+
+
+class Arg:
+ """Class to keep information about command line argument"""
+
+ # pylint: disable=redefined-builtin,unused-argument
+ def __init__(
+ self,
+ flags=_UNSET,
+ help=_UNSET,
+ action=_UNSET,
+ default=_UNSET,
+ nargs=_UNSET,
+ type=_UNSET,
+ choices=_UNSET,
+ required=_UNSET,
+ metavar=_UNSET,
+ ):
self.flags = flags
- self.help = help
- self.action = action
- self.default = default
- self.nargs = nargs
- self.type = type
- self.choices = choices
- self.metavar = metavar
+ self.kwargs = {}
+ for k, v in locals().items():
+ if v is _UNSET:
+ continue
+ if k in ("self", "flags"):
+ continue
+
+ self.kwargs[k] = v
+
+ # pylint: enable=redefined-builtin,unused-argument
+
+ def add_to_parser(self, parser):
+ """Add this argument to an ArgumentParser"""
+ parser.add_argument(*self.flags, **self.kwargs)
+
+
+def positive_int(value):
+ """Define a positive int type for an argument."""
+ try:
+ value = int(value)
+ if value > 0:
+ return value
+ except ValueError:
+ pass
+ raise argparse.ArgumentTypeError("invalid positive int value: '{}'".format(value))
+
+
+# Shared
+ARG_DAG_ID = Arg(("dag_id",), help="The id of the dag")
+ARG_TASK_ID = Arg(("task_id",), help="The id of the task")
+ARG_EXECUTION_DATE = Arg(("execution_date",), help="The execution date of the DAG", type=parsedate)
+ARG_TASK_REGEX = Arg(
+ ("-t", "--task-regex"), help="The regex to filter specific task_ids to backfill (optional)"
+)
+ARG_SUBDIR = Arg(
+ ("-S", "--subdir"),
+ help=(
+ "File location or directory from which to look for the dag. "
+ "Defaults to '[AIRFLOW_HOME]/dags' where [AIRFLOW_HOME] is the "
+ "value you set for 'AIRFLOW_HOME' config you set in 'airflow.cfg' "
+ ),
+ default='[AIRFLOW_HOME]/dags' if BUILD_DOCS else settings.DAGS_FOLDER,
+)
+ARG_START_DATE = Arg(("-s", "--start-date"), help="Override start_date YYYY-MM-DD", type=parsedate)
+ARG_END_DATE = Arg(("-e", "--end-date"), help="Override end_date YYYY-MM-DD", type=parsedate)
+ARG_OUTPUT_PATH = Arg(
+ (
+ "-o",
+ "--output-path",
+ ),
+ help="The output for generated yaml files",
+ type=str,
+ default="[CWD]" if BUILD_DOCS else os.getcwd(),
+)
+ARG_DRY_RUN = Arg(
+ ("-n", "--dry-run"),
+ help="Perform a dry run for each task. Only renders Template Fields for each task, nothing else",
+ action="store_true",
+)
+ARG_PID = Arg(("--pid",), help="PID file location", nargs='?')
+ARG_DAEMON = Arg(
+ ("-D", "--daemon"), help="Daemonize instead of running in the foreground", action="store_true"
+)
+ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file")
+ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file")
+ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file")
+ARG_YES = Arg(
+ ("-y", "--yes"), help="Do not prompt to confirm reset. Use with care!", action="store_true", default=False
+)
+
+# list_dag_runs
+ARG_DAG_ID_OPT = Arg(("-d", "--dag-id"), help="The id of the dag")
+ARG_NO_BACKFILL = Arg(
+ ("--no-backfill",), help="filter all the backfill dagruns given the dag id", action="store_true"
+)
+ARG_STATE = Arg(("--state",), help="Only list the dag runs corresponding to the state")
+
+# backfill
+ARG_MARK_SUCCESS = Arg(
+ ("-m", "--mark-success"), help="Mark jobs as succeeded without running them", action="store_true"
+)
+ARG_LOCAL = Arg(("-l", "--local"), help="Run the task using the LocalExecutor", action="store_true")
+ARG_POOL = Arg(("--pool",), "Resource pool to use")
+
+# list_tasks
+ARG_TREE = Arg(("-t", "--tree"), help="Tree view", action="store_true")
+
+# clear
+ARG_UPSTREAM = Arg(("-u", "--upstream"), help="Include upstream tasks", action="store_true")
+ARG_ONLY_FAILED = Arg(("-f", "--only-failed"), help="Only failed jobs", action="store_true")
+ARG_ONLY_RUNNING = Arg(("-r", "--only-running"), help="Only running jobs", action="store_true")
+ARG_DOWNSTREAM = Arg(("-d", "--downstream"), help="Include downstream tasks", action="store_true")
+ARG_EXCLUDE_SUBDAGS = Arg(("-x", "--exclude-subdags"), help="Exclude subdags", action="store_true")
+ARG_EXCLUDE_PARENTDAG = Arg(
+ ("-X", "--exclude-parentdag"),
+ help="Exclude ParentDAGS if the task cleared is a part of a SubDAG",
+ action="store_true",
+)
+ARG_DAG_REGEX = Arg(
+ ("-R", "--dag-regex"), help="Search dag_id as regex instead of exact string", action="store_true"
+)
+
+# show_dag
+ARG_SAVE = Arg(("-s", "--save"), help="Saves the result to the indicated file.")
+
+ARG_IMGCAT = Arg(("--imgcat",), help="Displays graph using the imgcat tool.", action='store_true')
+
+# trigger_dag
+ARG_RUN_ID = Arg(("-r", "--run-id"), help="Helps to identify this run")
+ARG_CONF = Arg(('-c', '--conf'), help="JSON string that gets pickled into the DagRun's conf attribute")
+ARG_EXEC_DATE = Arg(("-e", "--exec-date"), help="The execution date of the DAG", type=parsedate)
+
+# pool
+ARG_POOL_NAME = Arg(("pool",), metavar='NAME', help="Pool name")
+ARG_POOL_SLOTS = Arg(("slots",), type=int, help="Pool slots")
+ARG_POOL_DESCRIPTION = Arg(("description",), help="Pool description")
+ARG_POOL_IMPORT = Arg(("file",), metavar="FILEPATH", help="Import pools from JSON file")
+ARG_POOL_EXPORT = Arg(("file",), metavar="FILEPATH", help="Export all pools to JSON file")
+
+# variables
+ARG_VAR = Arg(("key",), help="Variable key")
+ARG_VAR_VALUE = Arg(("value",), metavar='VALUE', help="Variable value")
+ARG_DEFAULT = Arg(
+ ("-d", "--default"), metavar="VAL", default=None, help="Default value returned if variable does not exist"
+)
+ARG_JSON = Arg(("-j", "--json"), help="Deserialize JSON variable", action="store_true")
+ARG_VAR_IMPORT = Arg(("file",), help="Import variables from JSON file")
+ARG_VAR_EXPORT = Arg(("file",), help="Export all variables to JSON file")
+
+# kerberos
+ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs='?')
+ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs='?', default=conf.get('kerberos', 'keytab'))
+# run
+# TODO(aoen): "force" is a poor choice of name here since it implies it overrides
+# all dependencies (not just past success), e.g. the ignore_depends_on_past
+# dependency. This flag should be deprecated and renamed to 'ignore_ti_state' and
+# the "ignore_all_dependencies" command should be called the"force" command
+# instead.
+ARG_INTERACTIVE = Arg(
+ ('-N', '--interactive'),
+ help='Do not capture standard output and error streams (useful for interactive debugging)',
+ action='store_true',
+)
+ARG_FORCE = Arg(
+ ("-f", "--force"),
+ help="Ignore previous task instance state, rerun regardless if task already succeeded/failed",
+ action="store_true",
+)
+ARG_RAW = Arg(("-r", "--raw"), argparse.SUPPRESS, "store_true")
+ARG_IGNORE_ALL_DEPENDENCIES = Arg(
+ ("-A", "--ignore-all-dependencies"),
+ help="Ignores all non-critical dependencies, including ignore_ti_state and ignore_task_deps",
+ action="store_true",
+)
+# TODO(aoen): ignore_dependencies is a poor choice of name here because it is too
+# vague (e.g. a task being in the appropriate state to be run is also a dependency
+# but is not ignored by this flag), the name 'ignore_task_dependencies' is
+# slightly better (as it ignores all dependencies that are specific to the task),
+# so deprecate the old command name and use this instead.
+ARG_IGNORE_DEPENDENCIES = Arg(
+ ("-i", "--ignore-dependencies"),
+ help="Ignore task-specific dependencies, e.g. upstream, depends_on_past, and retry delay dependencies",
+ action="store_true",
+)
+ARG_IGNORE_DEPENDS_ON_PAST = Arg(
+ ("-I", "--ignore-depends-on-past"),
+ help="Ignore depends_on_past dependencies (but respect upstream dependencies)",
+ action="store_true",
+)
+ARG_SHIP_DAG = Arg(
+ ("--ship-dag",), help="Pickles (serializes) the DAG and ships it to the worker", action="store_true"
+)
+ARG_PICKLE = Arg(("-p", "--pickle"), help="Serialized pickle object of the entire dag (used internally)")
+ARG_JOB_ID = Arg(("-j", "--job-id"), help=argparse.SUPPRESS)
+ARG_CFG_PATH = Arg(("--cfg-path",), help="Path to config file to use instead of airflow.cfg")
+
+# worker
+ARG_QUEUES = Arg(
+ ("-q", "--queues"),
+ help="Comma delimited list of queues to serve",
+ default=conf.get('celery', 'DEFAULT_QUEUE'),
+)
+ARG_CONCURRENCY = Arg(
+ ("-c", "--concurrency"),
+ type=int,
+ help="The number of worker processes",
+ default=conf.get('celery', 'worker_concurrency'),
+)
+ARG_CELERY_HOSTNAME = Arg(
+ ("-H", "--celery-hostname"),
+ help="Set the hostname of celery worker if you have multiple workers on a single machine",
+)
+
+# flower
+ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
+ARG_FLOWER_HOSTNAME = Arg(
+ ("-H", "--hostname"),
+ default=conf.get('celery', 'FLOWER_HOST'),
+ help="Set the hostname on which to run the server",
+)
+ARG_FLOWER_PORT = Arg(
+ ("-p", "--port"),
+ default=conf.get('celery', 'FLOWER_PORT'),
+ type=int,
+ help="The port on which to run the server",
+)
+ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower")
+ARG_FLOWER_URL_PREFIX = Arg(
+ ("-u", "--url-prefix"), default=conf.get('celery', 'FLOWER_URL_PREFIX'), help="URL prefix for Flower"
+)
+ARG_FLOWER_BASIC_AUTH = Arg(
+ ("-A", "--basic-auth"),
+ default=conf.get('celery', 'FLOWER_BASIC_AUTH'),
+ help=(
+ "Securing Flower with Basic Authentication. "
+ "Accepts user:password pairs separated by a comma. "
+ "Example: flower_basic_auth = user1:password1,user2:password2"
+ ),
+)
+ARG_TASK_PARAMS = Arg(("-t", "--task-params"), help="Sends a JSON params dict to the task")
+ARG_POST_MORTEM = Arg(
+ ("-m", "--post-mortem"), action="store_true", help="Open debugger on uncaught exception"
+)
+
+# connections
+ARG_CONN_ID = Arg(('conn_id',), help='Connection id, required to get/add/delete a connection', type=str)
+ARG_CONN_URI = Arg(
+ ('--conn-uri',), help='Connection URI, required to add a connection without conn_type', type=str
+)
+ARG_CONN_TYPE = Arg(
+ ('--conn-type',), help='Connection type, required to add a connection without conn_uri', type=str
+)
+ARG_CONN_HOST = Arg(('--conn-host',), help='Connection host, optional when adding a connection', type=str)
+ARG_CONN_LOGIN = Arg(('--conn-login',), help='Connection login, optional when adding a connection', type=str)
+ARG_CONN_PASSWORD = Arg(
+ ('--conn-password',), help='Connection password, optional when adding a connection', type=str
+)
+ARG_CONN_SCHEMA = Arg(
+ ('--conn-schema',), help='Connection schema, optional when adding a connection', type=str
+)
+ARG_CONN_PORT = Arg(('--conn-port',), help='Connection port, optional when adding a connection', type=str)
+ARG_CONN_EXTRA = Arg(
+ ('--conn-extra',), help='Connection `Extra` field, optional when adding a connection', type=str
+)
+
+# users
+ARG_USERNAME = Arg(('-u', '--username'), help='Username of the user', required=True, type=str)
+ARG_FIRSTNAME = Arg(('-f', '--firstname'), help='First name of the user', required=True, type=str)
+ARG_LASTNAME = Arg(('-l', '--lastname'), help='Last name of the user', required=True, type=str)
+ARG_ROLE = Arg(
+ ('-r', '--role'),
+ help='Role of the user. Existing roles include Admin, User, Op, Viewer, and Public',
+ required=True,
+ type=str,
+)
+ARG_EMAIL = Arg(('-e', '--email'), help='Email of the user', required=True, type=str)
+ARG_PASSWORD = Arg(
+ ('-p', '--password'),
+ help='Password of the user, required to create a user without --use-random-password',
+ type=str,
+)
+ARG_USE_RANDOM_PASSWORD = Arg(
+ ('--use-random-password',),
+ help='Do not prompt for password. Use random string instead.'
+ ' Required to create a user without --password ',
+ default=False,
+ action='store_true',
+)
+
+# roles
+ARG_AUTOSCALE = Arg(('-a', '--autoscale'), help="Minimum and Maximum number of worker to autoscale")
+ARG_SKIP_SERVE_LOGS = Arg(
+ ("-s", "--skip-serve-logs"),
+ default=False,
+ help="Don't start the serve logs process along with the workers",
+ action="store_true",
+)
+
+ALTERNATIVE_CONN_SPECS_ARGS = [
+ ARG_CONN_TYPE,
+ ARG_CONN_HOST,
+ ARG_CONN_LOGIN,
+ ARG_CONN_PASSWORD,
+ ARG_CONN_SCHEMA,
+ ARG_CONN_PORT,
+]
+
+# A special "argument" (that is hidden from help) that sets `args.deprecation_warning=False` in the resulting
+# Namespace. Add this to any commands that use the same implementation function in new and old names to
+# supresses the warning for the new form.
+NOT_DEPRECATED = Arg(("--deprecation_warning",), help=argparse.SUPPRESS, default=False, required=False)
+
+_ActionCommand = namedtuple('ActionCommand', ['name', 'help', 'func', 'args', 'description', 'epilog',
+ 'prog'])
+_GroupCommand = namedtuple('GroupCommand', ['name', 'help', 'subcommands', 'description', 'epilog'])
+
+_ActionCommand.__new__.__defaults__ = (None,) * len(_ActionCommand._fields) # type: ignore
+_GroupCommand.__new__.__defaults__ = (None,) * len(_GroupCommand._fields) # type: ignore
+
+ActionCommand = cast(Any, _ActionCommand)
+GroupCommand = cast(Any, _GroupCommand)
+
+
+DAGS_COMMANDS = (
+ ActionCommand(
+ name='list',
+ help="List all the DAGs",
+ func=list_dags,
+ args=(ARG_SUBDIR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='report',
+ help='Show DagBag loading report',
+ func=list_dags_report,
+ args=(ARG_SUBDIR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='list-runs',
+ help="List DAG runs given a DAG id",
+ description=(
+ "List DAG runs given a DAG id. If state option is given, it will only search for all the "
+ "dagruns with the given state. If no_backfill option is given, it will filter out all "
+ "backfill dagruns for given dag id. If start_date is given, it will filter out all the "
+ "dagruns that were executed before this date. If end_date is given, it will filter out "
+ "all the dagruns that were executed after this date. "
+ ),
+ func=list_dag_runs,
+ args=(ARG_DAG_ID_OPT, ARG_NO_BACKFILL, ARG_STATE, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='state',
+ help="Get the status of a dag run",
+ func=dag_state,
+ args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='next-execution',
+ help="Get the next execution datetimes of a DAG",
+ description=(
+ "Get the next execution datetimes of a DAG. It returns one execution unless the "
+ "num-executions option is given"
+ ),
+ func=next_execution,
+ args=(ARG_DAG_ID, ARG_SUBDIR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='pause',
+ help='Pause a DAG',
+ func=pause,
+ args=(ARG_DAG_ID, ARG_SUBDIR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='unpause',
+ help='Resume a paused DAG',
+ func=unpause,
+ args=(ARG_DAG_ID, ARG_SUBDIR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='trigger',
+ help='Trigger a DAG run',
+ func=trigger_dag,
+ args=(ARG_DAG_ID, ARG_SUBDIR, ARG_RUN_ID, ARG_CONF, ARG_EXEC_DATE, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='delete',
+ help="Delete all DB records related to the specified DAG",
+ func=delete_dag,
+ args=(ARG_DAG_ID, ARG_YES, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='show',
+ help="Displays DAG's tasks with their dependencies",
+ description=(
+ "The --imgcat option only works in iTerm.\n"
+ "\n"
+ "For more information, see: https://www.iterm2.com/documentation-images.html\n"
+ "\n"
+ "The --save option saves the result to the indicated file.\n"
+ "\n"
+ "The file format is determined by the file extension. "
+ "For more information about supported "
+ "format, see: https://www.graphviz.org/doc/info/output.html\n"
+ "\n"
+ "If you want to create a PNG file then you should execute the following command:\n"
+ "airflow dags show <DAG_ID> --save output.png\n"
+ "\n"
+ "If you want to create a DOT file then you should execute the following command:\n"
+ "airflow dags show <DAG_ID> --save output.dot\n"
+ ),
+ func=show_dag,
+ args=(
+ ARG_DAG_ID,
+ ARG_SUBDIR,
+ ARG_SAVE,
+ ARG_IMGCAT,
+ NOT_DEPRECATED,
+ ),
+ ),
+)
+TASKS_COMMANDS = (
+ ActionCommand(
+ name='list',
+ help="List the tasks within a DAG",
+ func=list_tasks,
+ args=(ARG_DAG_ID, ARG_TREE, ARG_SUBDIR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='clear',
+ help="Clear a set of task instance, as if they never ran",
+ func=clear,
+ args=(
+ ARG_DAG_ID,
+ ARG_TASK_REGEX,
+ ARG_START_DATE,
+ ARG_END_DATE,
+ ARG_SUBDIR,
+ ARG_UPSTREAM,
+ ARG_DOWNSTREAM,
+ ARG_YES,
+ ARG_ONLY_FAILED,
+ ARG_ONLY_RUNNING,
+ ARG_EXCLUDE_SUBDAGS,
+ ARG_EXCLUDE_PARENTDAG,
+ ARG_DAG_REGEX,
+ NOT_DEPRECATED,
+ ),
+ ),
+ ActionCommand(
+ name='state',
+ help="Get the status of a task instance",
+ func=task_state,
+ args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='failed-deps',
+ help="Returns the unmet dependencies for a task instance",
+ description=(
+ "Returns the unmet dependencies for a task instance from the perspective of the scheduler. "
+ "In other words, why a task instance doesn't get scheduled and then queued by the scheduler, "
+ "and then run by an executor."
+ ),
+ func=task_failed_deps,
+ args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='render',
+ help="Render a task instance's template(s)",
+ func=render,
+ args=(ARG_DAG_ID, ARG_TASK_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='run',
+ help="Run a single task instance",
+ func=run,
+ args=(
+ ARG_DAG_ID,
+ ARG_TASK_ID,
+ ARG_EXECUTION_DATE,
+ ARG_SUBDIR,
+ ARG_MARK_SUCCESS,
+ ARG_FORCE,
+ ARG_POOL,
+ ARG_CFG_PATH,
+ ARG_LOCAL,
+ ARG_RAW,
+ ARG_IGNORE_ALL_DEPENDENCIES,
+ ARG_IGNORE_DEPENDENCIES,
+ ARG_IGNORE_DEPENDS_ON_PAST,
+ ARG_SHIP_DAG,
+ ARG_PICKLE,
+ ARG_JOB_ID,
+ ARG_INTERACTIVE,
+ NOT_DEPRECATED,
+ ),
+ ),
+ ActionCommand(
+ name='test',
+ help="Test a task instance",
+ description=(
+ "Test a task instance. This will run a task without checking for dependencies or recording "
+ "its state in the database"
+ ),
+ func=test,
+ args=(
+ ARG_DAG_ID,
+ ARG_TASK_ID,
+ ARG_EXECUTION_DATE,
+ ARG_SUBDIR,
+ ARG_DRY_RUN,
+ ARG_TASK_PARAMS,
+ ARG_POST_MORTEM,
+ NOT_DEPRECATED,
+ ),
+ ),
+)
+POOLS_COMMANDS = (
+ ActionCommand(
+ name='list',
+ help='List pools',
+ func=pool_list,
+ args=(NOT_DEPRECATED,),
+ ),
+ ActionCommand(
+ name='get',
+ help='Get pool size',
+ func=pool_get,
+ args=(
+ ARG_POOL_NAME,
+ NOT_DEPRECATED,
+ ),
+ ),
+ ActionCommand(
+ name='set',
+ help='Configure pool',
+ func=pool_set,
+ args=(
+ ARG_POOL_NAME,
+ ARG_POOL_SLOTS,
+ ARG_POOL_DESCRIPTION,
+ NOT_DEPRECATED,
+ ),
+ ),
+ ActionCommand(
+ name='delete',
+ help='Delete pool',
+ func=pool_delete,
+ args=(
+ ARG_POOL_NAME,
+ NOT_DEPRECATED,
+ ),
+ ),
+ ActionCommand(
+ name='import',
+ help='Import pools',
+ func=pool_import,
+ args=(
+ ARG_POOL_IMPORT,
+ NOT_DEPRECATED,
+ ),
+ ),
+ ActionCommand(
+ name='export',
+ help='Export all pools',
+ func=pool_import,
+ args=(
+ ARG_POOL_EXPORT,
+ NOT_DEPRECATED,
+ ),
+ ),
+)
+VARIABLES_COMMANDS = (
+ ActionCommand(
+ name='get',
+ help='Get variable',
+ func=variables_get,
+ args=(ARG_VAR, ARG_JSON, ARG_DEFAULT, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='set',
+ help='Set variable',
+ func=variables_set,
+ args=(ARG_VAR, ARG_VAR_VALUE, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='delete',
+ help='Delete variable',
+ func=variables_delete,
+ args=(ARG_VAR, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='import',
+ help='Import variables',
+ func=variables_import,
+ args=(ARG_VAR_IMPORT, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='export',
+ help='Export all variables',
+ func=variables_export,
+ args=(ARG_VAR_EXPORT, NOT_DEPRECATED),
+ ),
+)
+DB_COMMANDS = (
+ ActionCommand(
+ name='init',
+ help="Initialize the metadata database",
+ func=initdb,
+ args=(NOT_DEPRECATED,),
+ ),
+ ActionCommand(
+ name='reset',
+ help="Burn down and rebuild the metadata database",
+ func=resetdb,
+ args=(ARG_YES, NOT_DEPRECATED),
+ ),
+ ActionCommand(
+ name='upgrade',
+ help="Upgrade the metadata database to latest version",
+ func=upgrade_check,
+ args=(NOT_DEPRECATED,),
+ ),
+ ActionCommand(
+ name='shell',
+ help="Runs a shell to access the database",
+ func=shell,
+ args=(NOT_DEPRECATED,),
+ ),
+ ActionCommand(
+ name='check',
+ help="Check if the database can be reached",
+ func=checkdb,
+ args=(NOT_DEPRECATED,),
+ ),
+)
+CONNECTIONS_COMMANDS = (
+ ActionCommand(
+ name='list',
+ help='List connections',
+ func=connections_list,
+ args=(NOT_DEPRECATED,),
+ ),
+ ActionCommand(
+ name='add',
+ help='Add a connection',
+ func=connections_add,
+ args=(ARG_CONN_ID, ARG_CONN_URI, ARG_CONN_EXTRA, NOT_DEPRECATED) + tuple(ALTERNATIVE_CONN_SPECS_ARGS),
+ ),
+ ActionCommand(
+ name='delete',
+ help='Delete a connection',
+ func=connections_delete,
+ args=(ARG_CONN_ID, NOT_DEPRECATED),
+ ),
+)
+
+USERS_COMMANDS = (
+ ActionCommand(
+ name='list',
+ help='List users',
+ func=list_users,
+ args=(NOT_DEPRECATED,),
+ ),
+ ActionCommand(
+ name='create',
+ help='Create a user',
+ func=create_user,
+ args=(
+ ARG_ROLE,
+ ARG_USERNAME,
+ ARG_EMAIL,
+ ARG_FIRSTNAME,
+ ARG_LASTNAME,
+ ARG_PASSWORD,
+ ARG_USE_RANDOM_PASSWORD,
+ NOT_DEPRECATED,
+ ),
+ epilog=(
+ 'examples:\n'
+ 'To create an user with "Admin" role and username equals to "admin", run:\n'
+ '\n'
+ ' $ airflow users create \\\n'
+ ' --username admin \\\n'
+ ' --firstname FIRST_NAME \\\n'
+ ' --lastname LAST_NAME \\\n'
+ ' --role Admin \\\n'
+ ' --email admin@example.org'
+ ),
+ ),
+ ActionCommand(
+ name='delete',
+ help='Delete a user',
+ func=delete_user,
+ args=(ARG_USERNAME, NOT_DEPRECATED),
+ ),
+)
+
+CELERY_COMMANDS = (
+ ActionCommand(
+ name='worker',
+ help="Start a Celery worker node",
+ func=worker,
+ args=(
+ ARG_QUEUES,
+ ARG_CONCURRENCY,
+ ARG_CELERY_HOSTNAME,
+ ARG_PID,
+ ARG_DAEMON,
+ ARG_STDOUT,
+ ARG_STDERR,
+ ARG_LOG_FILE,
+ ARG_AUTOSCALE,
+ ARG_SKIP_SERVE_LOGS,
+ NOT_DEPRECATED,
+ ),
+ ),
+ ActionCommand(
+ name='flower',
+ help="Start a Celery Flower",
+ func=flower,
+ args=(
+ ARG_FLOWER_HOSTNAME,
+ ARG_FLOWER_PORT,
+ ARG_FLOWER_CONF,
+ ARG_FLOWER_URL_PREFIX,
+ ARG_FLOWER_BASIC_AUTH,
+ ARG_BROKER_API,
+ ARG_PID,
+ ARG_DAEMON,
+ ARG_STDOUT,
+ ARG_STDERR,
+ ARG_LOG_FILE,
+ NOT_DEPRECATED,
+ ),
+ ),
+)
+
+CONFIG_COMMANDS = (
+ ActionCommand(
+ name='list',
+ help='List options for the configuration',
+ func=config,
+ args=(NOT_DEPRECATED,),
+ ),
+)
+
+KUBERNETES_COMMANDS = (
+ ActionCommand(
+ name='generate-dag-yaml',
+ help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without "
+ "launching into a cluster",
+ func=kubernetes_generate_dag_yaml,
+ args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH, NOT_DEPRECATED),
+ ),
+)
+
+airflow_commands = [
+ GroupCommand(
+ name='dags',
+ help='Manage DAGs',
+ subcommands=DAGS_COMMANDS,
+ ),
+ GroupCommand(
+ name="kubernetes", help='Tools to help run the KubernetesExecutor', subcommands=KUBERNETES_COMMANDS
+ ),
+ GroupCommand(
+ name='tasks',
+ help='Manage tasks',
+ subcommands=TASKS_COMMANDS,
+ ),
+ GroupCommand(
+ name='pools',
+ help="Manage pools",
+ subcommands=POOLS_COMMANDS,
+ ),
+ GroupCommand(
+ name='variables',
+ help="Manage variables",
+ subcommands=VARIABLES_COMMANDS,
+ ),
+ GroupCommand(
+ name='db',
+ help="Database operations",
+ subcommands=DB_COMMANDS,
+ ),
+ ActionCommand(
+ name='kerberos',
+ help="Start a kerberos ticket renewer",
+ func=kerberos,
+ args=(ARG_PRINCIPAL, ARG_KEYTAB, ARG_PID, ARG_DAEMON, ARG_STDOUT, ARG_STDERR, ARG_LOG_FILE),
+ ),
+ GroupCommand(
+ name='connections',
+ help="Manage connections",
+ subcommands=CONNECTIONS_COMMANDS,
+ ),
+ GroupCommand(
+ name='users',
+ help="Manage users",
+ subcommands=USERS_COMMANDS,
+ ),
+ ActionCommand(
+ name='sync-perm',
+ help="Update permissions for existing roles and DAGs",
+ func=sync_perm,
+ args=(NOT_DEPRECATED,),
+ ),
+ ActionCommand(
+ name='rotate-fernet-key',
+ func=rotate_fernet_key,
+ help='Rotate encrypted connection credentials and variables',
+ description=(
+ 'Rotate all encrypted connection credentials and variables; see '
+ 'https://airflow.apache.org/docs/stable/howto/secure-connections.html'
+ '#rotating-encryption-keys'
+ ),
+ args=(NOT_DEPRECATED,),
+ ),
+ GroupCommand(name="config", help='View configuration', subcommands=CONFIG_COMMANDS),
+ GroupCommand(
+ name="celery",
+ help='Celery components',
+ description=(
+ 'Start celery components. Works only when using CeleryExecutor. For more information, see '
+ 'https://airflow.apache.org/docs/stable/executor/celery.html'
+ ),
+ subcommands=CELERY_COMMANDS,
+ ),
+]
+ALL_COMMANDS_DICT = {sp.name: sp for sp in airflow_commands}
+DAG_CLI_COMMANDS = {'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause', 'list_dag_runs'}
+
+
+class AirflowHelpFormatter(argparse.HelpFormatter):
+ """
+ Custom help formatter to display help message.
+
+ It displays simple commands and groups of commands in separate sections.
+ """
+
+ def _format_action(self, action):
+ if isinstance(action, argparse._SubParsersAction): # pylint: disable=protected-access
+
+ parts = []
+ action_header = self._format_action_invocation(action)
+ action_header = '%*s%s\n' % (self._current_indent, '', action_header)
+ parts.append(action_header)
+
+ self._indent()
+ subactions = action._get_subactions() # pylint: disable=protected-access
+
+ action_subcommands, group_subcommands = partition(
+ lambda d: isinstance(ALL_COMMANDS_DICT.get(d.dest, None), GroupCommand), subactions
+ )
+ # Remove deprecated groups from the list -- we don't want to show them
+ parts.append("\n")
+ parts.append('%*s%s:\n' % (self._current_indent, '', "Groups"))
+ self._indent()
+ for subaction in group_subcommands:
+ parts.append(self._format_action(subaction))
+ self._dedent()
+
+ parts.append("\n")
+ parts.append('%*s%s:\n' % (self._current_indent, '', "Commands"))
+ self._indent()
+
+ for subaction in action_subcommands:
+ if getattr(action.choices[subaction.dest], 'hide_from_toplevel_help', False):
+ continue
+ parts.append(self._format_action(subaction))
+ self._dedent()
+ self._dedent()
+
+ # return a single string
+ return self._join_parts(parts)
+
+ return super(AirflowHelpFormatter, self)._format_action(action)
+
+
+def partition(pred, iterable):
+ iter_1, iter_2 = itertools.tee(iterable)
+ return itertools.filterfalse(pred, iter_1), filter(pred, iter_2)
+
+
+def _sort_args(args):
+ """Sort subcommand optional args, keep positional args"""
+
+ def get_long_option(arg):
+ """Get long option from Arg.flags"""
+ return arg.flags[0] if len(arg.flags) == 1 else arg.flags[1]
+
+ positional, optional = partition(lambda x: x.flags[0].startswith("-"), args)
+ for p in positional:
+ yield p
+ for o in sorted(optional, key=lambda x: get_long_option(x).lower()):
+ yield o
+
+
+def _add_command(subparsers, sub):
+
+ sub_proc = subparsers.add_parser(
+ sub.name, help=sub.help, description=sub.description or sub.help, epilog=sub.epilog,
+ )
+ sub_proc.formatter_class = argparse.RawTextHelpFormatter
+
+ if isinstance(sub, GroupCommand):
+ return _add_group_command(sub, sub_proc)
+ elif isinstance(sub, ActionCommand):
+ if sub.prog:
+ sub_proc.prog = sub.prog
+ return _add_action_command(sub, sub_proc)
+ else:
+ raise AirflowException("Invalid command definition.")
+
+
+def _add_action_command(sub, sub_proc):
+ for arg in _sort_args(sub.args):
+ arg.add_to_parser(sub_proc)
+ sub_proc.set_defaults(func=sub.func)
+
+
+def _add_group_command(sub, sub_proc):
+ subcommands = sub.subcommands
+ sub_subparsers = sub_proc.add_subparsers(dest="subcommand", metavar="COMMAND")
+ sub_subparsers.required = True
+
+ for command in sorted(subcommands, key=lambda x: x.name):
+ _add_command(sub_subparsers, command)
+ return sub_proc, sub_subparsers
class CLIFactory(object):
@@ -2818,6 +3860,34 @@ class CLIFactory(object):
'reset_dag_run', 'rerun_failed_tasks', 'run_backwards'
)
}, {
+ 'func': generate_pod_template,
+ 'help': "Reads your airflow.cfg and migrates your configurations into a "
+ "airflow_template.yaml file. From this point a user can link"
+ "this file to airflow using the `pod_template_file` argument"
+ "and modify using the Kubernetes API",
+ 'args': ('output_path',),
+ }, {
+ 'func': serve_logs,
+ 'help': "Serve logs generate by worker",
+ 'args': tuple(),
+ }, {
+ 'func': webserver,
+ 'help': "Start a Airflow webserver instance",
+ 'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname',
+ 'pid', 'daemon', 'stdout', 'stderr', 'access_logfile',
+ 'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 'debug'),
+ }, {
+ 'help': 'Show the version',
+ 'func': version,
+ 'args': tuple(),
+ }, {
+ 'help': 'Show information about current Airflow and environment',
+ 'func': info,
+ 'args': ('anonymize', 'file_io', ),
+ },
+ )
+ deprecated_subparsers = (
+ {
'func': list_dag_runs,
'help': "List dag runs given a DAG id. If state option is given, it will only"
"search for all the dagruns with the given state. "
@@ -2839,14 +3909,6 @@ class CLIFactory(object):
'args': (
'dag_id', 'output_path', 'subdir', 'execution_date'
)
-
- }, {
- 'func': generate_pod_template,
- 'help': "Reads your airflow.cfg and migrates your configurations into a"
- "airflow_template.yaml file. From this point a user can link"
- "this file to airflow using the `pod_template_file` argument"
- "and modify using the Kubernetes API",
- 'args': ('output_path',),
}, {
'func': clear,
'help': "Clear a set of task instance, as if they never ran",
@@ -2884,11 +3946,6 @@ class CLIFactory(object):
"args": ('set', 'get', 'json', 'default',
'var_import', 'var_export', 'var_delete'),
}, {
- 'func': kerberos,
- 'help': "Start a kerberos ticket renewer",
- 'args': ('principal', 'keytab', 'pid',
- 'daemon', 'stdout', 'stderr', 'log_file'),
- }, {
'func': render,
'help': "Render a task instance's template(s)",
'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
@@ -2925,10 +3982,6 @@ class CLIFactory(object):
'help': "Get the status of a task instance",
'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),
}, {
- 'func': serve_logs,
- 'help': "Serve logs generate by worker",
- 'args': tuple(),
- }, {
'func': test,
'help': (
"Test a task instance. This will run a task without checking for "
@@ -2937,12 +3990,6 @@ class CLIFactory(object):
'dag_id', 'task_id', 'execution_date', 'subdir', 'dry_run',
'task_params', 'post_mortem'),
}, {
- 'func': webserver,
- 'help': "Start a Airflow webserver instance",
- 'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname',
- 'pid', 'daemon', 'stdout', 'stderr', 'access_logfile',
- 'error_logfile', 'log_file', 'ssl_cert', 'ssl_key', 'debug'),
- }, {
'func': resetdb,
'help': "Burn down and rebuild the metadata database",
'args': ('yes',),
@@ -2959,12 +4006,6 @@ class CLIFactory(object):
'help': "Runs a shell to access the database",
'args': tuple(),
}, {
- 'func': scheduler,
- 'help': "Start a scheduler instance",
- 'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
- 'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
- 'log_file'),
- }, {
'func': worker,
'help': "Start a Celery worker node",
'args': ('do_pickle', 'queues', 'concurrency', 'celery_hostname',
@@ -2975,10 +4016,6 @@ class CLIFactory(object):
'args': ('flower_hostname', 'flower_port', 'flower_conf', 'flower_url_prefix',
'flower_basic_auth', 'broker_api', 'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
}, {
- 'func': version,
- 'help': "Show the version",
- 'args': tuple(),
- }, {
'func': connections,
'help': "List/Add/Delete connections",
'args': ('list_connections', 'add_connection', 'delete_connection',
@@ -3020,11 +4057,6 @@ class CLIFactory(object):
'args': ('color', ),
},
{
- 'help': 'Show information about current Airflow and environment',
- 'func': info,
- 'args': ('anonymize', 'file_io', ),
- },
- {
'name': 'upgrade_check',
'help': 'Check if you can safely upgrade to the new version.',
'func': upgrade_check,
@@ -3032,27 +4064,88 @@ class CLIFactory(object):
'args': (),
},
)
- subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
- dag_subparsers = (
+ deprecated_dag_subparsers = (
'list_tasks', 'backfill', 'test', 'run', 'pause', 'unpause', 'list_dag_runs')
@classmethod
def get_parser(cls, dag_parser=False):
"""Creates and returns command line argument parser"""
+
+ deprecated_subparsers_dict = {sp['func'].__name__: sp for sp in cls.deprecated_subparsers}
+
class DefaultHelpParser(argparse.ArgumentParser):
"""Override argparse.ArgumentParser.error and use print_help instead of print_usage"""
def error(self, message):
self.print_help()
self.exit(2, '\n{} command error: {}, see help above.\n'.format(self.prog, message))
- parser = DefaultHelpParser()
- subparsers = parser.add_subparsers(
- help='sub-command help', dest='subcommand')
+
+ def parse_known_args(self, args, namespace):
+ # Compat hack for optional sub-arguments in Py 2.7
+ fake_opt = getattr(self, "_fake_optional_subparser", False) and \
+ (args == [] or args[0].startswith('-'))
+ if fake_opt:
+ args = ["deprecated_"] + args
+
+ args, remain = super(DefaultHelpParser, self).parse_known_args(args, namespace)
+
+ if fake_opt:
+ # So it doesn't show up as "deprecated_"
+ args.subcommand = self._fake_optional_subparser
+ return args, remain
+
+ parser = DefaultHelpParser(formatter_class=AirflowHelpFormatter)
+ subparsers = parser.add_subparsers(dest='subcommand', metavar="GROUP_OR_COMMAND")
subparsers.required = True
- subparser_list = cls.dag_subparsers if dag_parser else cls.subparsers_dict.keys()
- for sub in subparser_list:
- sub = cls.subparsers_dict[sub]
+ subparser_list = DAG_CLI_COMMANDS if dag_parser else ALL_COMMANDS_DICT.keys()
+ for sub_name in sorted(subparser_list):
+ action = _add_command(subparsers, ALL_COMMANDS_DICT[sub_name])
+
+ # Deprecated "mode select", and new sub-command version? Merge them
+ # so they both work, but don't show help for the deprecated
+ # options!
+ if sub_name in deprecated_subparsers_dict and action is not None:
+ sp, sub_subparsers = action
+ deprecated = deprecated_subparsers_dict.pop(sub_name)
+ sp.set_defaults(func=deprecated['func'])
+ if six.PY3:
+ sub_subparsers.required = False
+
+ for arg in deprecated['args']:
+ if 'dag_id' in arg and dag_parser:
+ continue
+ arg = cls.args[arg]
+ # Don't show these options in the help output
+ kwargs = arg.kwargs.copy()
+ kwargs['help'] = argparse.SUPPRESS
+ sp.add_argument(*arg.flags, **kwargs)
+ else:
+ # Py2 doesn't support optional subcommands, so we have to fake it
+ sp._fake_optional_subparser = sub_name
+ _add_command(sub_subparsers, ActionCommand(
+ prog=sp.prog,
+ name='deprecated_',
+ help=deprecated['help'],
+ func=deprecated['func'],
+ args=(cls.args[arg] for arg in deprecated['args']),
+ ))
+
+ if dag_parser:
+ subparser_list = [
+ (deprecated_subparsers_dict[name], False)
+ for name in cls.deprecated_dag_subparsers
+ ]
+ else:
+ current = zip(cls.subparsers, itertools.repeat(False))
+ deprecated = zip(deprecated_subparsers_dict.values(), itertools.repeat(True))
+ subparser_list = itertools.chain(current, deprecated)
+ for (sub, hide_from_toplevel_help) in subparser_list:
+ if hide_from_toplevel_help and BUILD_DOCS:
+ # Don't show the deprecated commands in the docs
+ continue
+
sp = subparsers.add_parser(sub['func'].__name__, help=sub['help'])
+ sp.hide_from_toplevel_help = hide_from_toplevel_help
sp.set_defaults(func=sub['func'])
if 'from_module' in sub:
try:
@@ -3065,10 +4158,7 @@ class CLIFactory(object):
if 'dag_id' in arg and dag_parser:
continue
arg = cls.args[arg]
- kwargs = {
- f: v
- for f, v in vars(arg).items() if f != 'flags' and v}
- sp.add_argument(*arg.flags, **kwargs)
+ sp.add_argument(*arg.flags, **arg.kwargs)
return parser
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 31c8c62..578f07c 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -21,17 +21,18 @@
Utilities module for cli
"""
from __future__ import absolute_import
+from __future__ import print_function
import functools
import getpass
import json
+import os
import socket
import struct
import sys
from argparse import Namespace
from datetime import datetime
from fcntl import ioctl
-from os import environ
from termios import TIOCGWINSZ
from airflow.models import Log
@@ -163,7 +164,7 @@ def get_terminal_size(fallback=(80, 20)):
# as when when stdout is piped to another program
pass
try:
- return int(environ.get('LINES')), int(environ.get('COLUMNS'))
+ return int(os.environ.get('LINES')), int(os.environ.get('COLUMNS'))
except TypeError:
return fallback
@@ -171,3 +172,63 @@ def get_terminal_size(fallback=(80, 20)):
def header(text, fillchar):
rows, columns = get_terminal_size()
print(" {} ".format(text).center(columns, fillchar))
+
+
+def deprecated_action(func=None, new_name=None, sub_commands=False):
+ if not func:
+ return functools.partial(deprecated_action, new_name=new_name, sub_commands=sub_commands)
+
+ stream = sys.stderr
+ try:
+ from pip._vendor import colorama
+ WINDOWS = (sys.platform.startswith("win") or
+ (sys.platform == 'cli' and os.name == 'nt'))
+ if WINDOWS:
+ stream = colorama.AnsiToWin32(sys.stderr)
+ except Exception:
+ colorama = None
+
+ def should_color():
+ # Don't colorize things if we do not have colorama or if told not to
+ if not colorama:
+ return False
+
+ real_stream = (
+ stream if not isinstance(stream, colorama.AnsiToWin32)
+ else stream.wrapped
+ )
+
+ # If the stream is a tty we should color it
+ if hasattr(real_stream, "isatty") and real_stream.isatty():
+ return True
+
+ if os.environ.get("TERM") and "color" in os.environ.get("TERM"):
+ return True
+
+ # If anything else we should not color it
+ return False
+
+ @functools.wraps(func)
+ def wrapper(args):
+ if getattr(args, 'deprecation_warning', True):
+ command = args.subcommand or args.func.__name__
+ if sub_commands:
+ msg = (
+ "The mode (-l, -d, etc) options to {!r} have been deprecated and removed in Airflow 2.0,"
+ " please use the get/set/list subcommands instead"
+ ).format(command)
+ else:
+ prefix = "The {!r} command is deprecated and removed in Airflow 2.0, please use "
+ if isinstance(new_name, list):
+ msg = prefix.format(args.subcommand)
+ new_names = list(map(repr, new_name))
+ msg += "{}, or {}".format(", ".join(new_names[:-1]), new_names[-1])
+ msg += " instead"
+ else:
+ msg = (prefix + "{!r} instead").format(command, new_name)
+
+ if should_color():
+ msg = "".join([colorama.Fore.YELLOW, msg, colorama.Style.RESET_ALL])
+ print(msg, file=sys.stderr)
+ func(args)
+ return wrapper
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 7ef01dc..048f802 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -139,6 +139,8 @@ def create_mock_args(
args.ignore_dependencies = ignore_dependencies
args.force = force
args.interactive = interactive
+ # Needed for CLI deprecation warning decorator
+ args.subcommand = "fake-group"
return args
diff --git a/tests/cli/test_worker_initialisation.py b/tests/cli/test_worker_initialisation.py
index 99fee8e..b068830 100644
--- a/tests/cli/test_worker_initialisation.py
+++ b/tests/cli/test_worker_initialisation.py
@@ -28,7 +28,7 @@ from tests.test_utils.config import conf_vars
patch('airflow.utils.cli.action_logging', lambda x: x).start()
from airflow.bin import cli # noqa
-mock_args = Namespace(queues=1, concurrency=1)
+mock_args = Namespace(queues=1, concurrency=1, subcommand='worker')
class TestWorkerPrecheck(unittest.TestCase):