You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/11/17 18:21:05 UTC
[airflow] branch main updated: completed D400 for airflow/callbacks/* airflow/cli/* (#27721)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7ea8475128 completed D400 for airflow/callbacks/* airflow/cli/* (#27721)
7ea8475128 is described below
commit 7ea8475128009b348a82d122747ca1df2823e006
Author: Dov Benyomin Sohacheski <b...@kloud.email>
AuthorDate: Thu Nov 17 20:20:58 2022 +0200
completed D400 for airflow/callbacks/* airflow/cli/* (#27721)
---
airflow/callbacks/callback_requests.py | 2 ++
airflow/cli/cli_parser.py | 28 ++++++++--------
airflow/cli/commands/celery_command.py | 10 +++---
airflow/cli/commands/config_command.py | 6 ++--
airflow/cli/commands/connection_command.py | 14 ++++----
airflow/cli/commands/dag_command.py | 29 ++++++++--------
airflow/cli/commands/dag_processor_command.py | 4 +--
airflow/cli/commands/db_command.py | 16 ++++-----
airflow/cli/commands/info_command.py | 26 +++++++--------
airflow/cli/commands/jobs_command.py | 2 +-
airflow/cli/commands/kerberos_command.py | 4 +--
airflow/cli/commands/kubernetes_command.py | 8 ++---
airflow/cli/commands/legacy_commands.py | 2 +-
airflow/cli/commands/plugins_command.py | 2 +-
airflow/cli/commands/pool_command.py | 18 +++++-----
airflow/cli/commands/provider_command.py | 18 +++++-----
airflow/cli/commands/role_command.py | 16 +++++----
airflow/cli/commands/rotate_fernet_key_command.py | 4 +--
airflow/cli/commands/scheduler_command.py | 8 ++---
airflow/cli/commands/standalone_command.py | 34 +++++++++++--------
airflow/cli/commands/sync_perm_command.py | 4 +--
airflow/cli/commands/task_command.py | 40 ++++++++++++++---------
airflow/cli/commands/triggerer_command.py | 4 +--
airflow/cli/commands/user_command.py | 16 ++++-----
airflow/cli/commands/variable_command.py | 18 +++++-----
airflow/cli/commands/version_command.py | 4 +--
airflow/cli/commands/webserver_command.py | 27 ++++++++-------
airflow/cli/simple_table.py | 14 ++++----
28 files changed, 202 insertions(+), 176 deletions(-)
diff --git a/airflow/callbacks/callback_requests.py b/airflow/callbacks/callback_requests.py
index f0c33e79f8..8ec0187978 100644
--- a/airflow/callbacks/callback_requests.py
+++ b/airflow/callbacks/callback_requests.py
@@ -61,6 +61,8 @@ class CallbackRequest:
class TaskCallbackRequest(CallbackRequest):
"""
+ Task callback status information.
+
A Class with information about the success/failure TI callback to be executed. Currently, only failure
callbacks (when tasks are externally killed) and Zombies are run via DagFileProcessorProcess.
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index e2456d7937..a6ec776bd2 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -16,7 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Command-line interface"""
+"""Command-line interface."""
from __future__ import annotations
import argparse
@@ -44,7 +44,7 @@ BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ
def lazy_load_command(import_path: str) -> Callable:
- """Create a lazy loader for command"""
+ """Create a lazy loader for command."""
_, _, name = import_path.rpartition(".")
def command(*args, **kwargs):
@@ -57,10 +57,10 @@ def lazy_load_command(import_path: str) -> Callable:
class DefaultHelpParser(argparse.ArgumentParser):
- """CustomParser to display help message"""
+ """CustomParser to display help message."""
def _check_value(self, action, value):
- """Override _check_value and check conditionally added command"""
+ """Override _check_value and check conditionally added command."""
if action.dest == "subcommand" and value == "celery":
executor = conf.get("core", "EXECUTOR")
if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
@@ -105,7 +105,7 @@ class DefaultHelpParser(argparse.ArgumentParser):
super()._check_value(action, value)
def error(self, message):
- """Override error and use print_instead of print_usage"""
+ """Override error and use print_instead of print_usage."""
self.print_help()
self.exit(2, f"\n{self.prog} command error: {message}, see help above.\n")
@@ -115,7 +115,7 @@ _UNSET = object()
class Arg:
- """Class to keep information about command line argument"""
+ """Class to keep information about command line argument."""
def __init__(
self,
@@ -141,7 +141,7 @@ class Arg:
self.kwargs[k] = v
def add_to_parser(self, parser: argparse.ArgumentParser):
- """Add this argument to an ArgumentParser"""
+ """Add this argument to an ArgumentParser."""
parser.add_argument(*self.flags, **self.kwargs)
@@ -163,12 +163,12 @@ def positive_int(*, allow_zero):
def string_list_type(val):
- """Parses comma-separated list and returns list of string (strips whitespace)"""
+ """Parses comma-separated list and returns list of string (strips whitespace)."""
return [x.strip() for x in val.split(",")]
def string_lower_type(val):
- """Lowers arg"""
+ """Lowers arg."""
if not val:
return
return val.strip().lower()
@@ -982,7 +982,7 @@ ALTERNATIVE_CONN_SPECS_ARGS = [
class ActionCommand(NamedTuple):
- """Single CLI command"""
+ """Single CLI command."""
name: str
help: str
@@ -993,7 +993,7 @@ class ActionCommand(NamedTuple):
class GroupCommand(NamedTuple):
- """ClI command with subcommands"""
+ """ClI command with subcommands."""
name: str
help: str
@@ -2148,7 +2148,7 @@ class AirflowHelpFormatter(argparse.HelpFormatter):
@lru_cache(maxsize=None)
def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser:
- """Creates and returns command line argument parser"""
+ """Creates and returns command line argument parser."""
parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter)
subparsers = parser.add_subparsers(dest="subcommand", metavar="GROUP_OR_COMMAND")
subparsers.required = True
@@ -2163,10 +2163,10 @@ def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser:
def _sort_args(args: Iterable[Arg]) -> Iterable[Arg]:
- """Sort subcommand optional args, keep positional args"""
+ """Sort subcommand optional args, keep positional args."""
def get_long_option(arg: Arg):
- """Get long option from Arg.flags"""
+ """Get long option from Arg.flags."""
return arg.flags[0] if len(arg.flags) == 1 else arg.flags[1]
positional, optional = partition(lambda x: x.flags[0].startswith("-"), args)
diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py
index 3abcfd708b..2198062e6b 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Celery command"""
+"""Celery command."""
from __future__ import annotations
from multiprocessing import Process
@@ -39,7 +39,7 @@ WORKER_PROCESS_NAME = "worker"
@cli_utils.action_cli
def flower(args):
- """Starts Flower, Celery monitoring tool"""
+ """Starts Flower, Celery monitoring tool."""
options = [
"flower",
conf.get("celery", "BROKER_URL"),
@@ -84,7 +84,7 @@ def flower(args):
def _serve_logs(skip_serve_logs: bool = False) -> Process | None:
- """Starts serve_logs sub-process"""
+ """Starts serve_logs sub-process."""
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
@@ -103,7 +103,7 @@ def _run_worker(options, skip_serve_logs):
@cli_utils.action_cli
def worker(args):
- """Starts Airflow Celery worker"""
+ """Starts Airflow Celery worker."""
# Disable connection pool so that celery worker does not hold an unnecessary db connection
settings.reconfigure_orm(disable_connection_pool=True)
if not settings.validate_session():
@@ -205,7 +205,7 @@ def worker(args):
@cli_utils.action_cli
def stop_worker(args):
- """Sends SIGTERM to Celery worker"""
+ """Sends SIGTERM to Celery worker."""
# Read PID from file
if args.pid:
pid_file_path = args.pid
diff --git a/airflow/cli/commands/config_command.py b/airflow/cli/commands/config_command.py
index 784428a838..f3a1eecfee 100644
--- a/airflow/cli/commands/config_command.py
+++ b/airflow/cli/commands/config_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Config sub-commands"""
+"""Config sub-commands."""
from __future__ import annotations
import io
@@ -28,7 +28,7 @@ from airflow.utils.code_utils import get_terminal_formatter
def show_config(args):
- """Show current application configuration"""
+ """Show current application configuration."""
with io.StringIO() as output:
conf.write(output)
code = output.getvalue()
@@ -38,7 +38,7 @@ def show_config(args):
def get_value(args):
- """Get one value from configuration"""
+ """Get one value from configuration."""
if not conf.has_section(args.section):
raise SystemExit(f"The section [{args.section}] is not found in config.")
diff --git a/airflow/cli/commands/connection_command.py b/airflow/cli/commands/connection_command.py
index 68594e2e80..7737f4fa2c 100644
--- a/airflow/cli/commands/connection_command.py
+++ b/airflow/cli/commands/connection_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Connection sub-commands"""
+"""Connection sub-commands."""
from __future__ import annotations
import io
@@ -74,7 +74,7 @@ def connections_get(args):
@suppress_logs_and_warning
def connections_list(args):
- """Lists all connections at the command line"""
+ """Lists all connections at the command line."""
with create_session() as session:
query = session.query(Connection)
if args.conn_id:
@@ -132,7 +132,7 @@ def _is_stdout(fileio: io.TextIOWrapper) -> bool:
def _valid_uri(uri: str) -> bool:
- """Check if a URI is valid, by checking if both scheme and netloc are available"""
+ """Check if a URI is valid, by checking if both scheme and netloc are available."""
uri_parts = urlsplit(uri)
return uri_parts.scheme != "" and uri_parts.netloc != ""
@@ -149,7 +149,7 @@ def _get_connection_types() -> list[str]:
def connections_export(args):
- """Exports all connections to a file"""
+ """Exports all connections to a file."""
file_formats = [".yaml", ".json", ".env"]
if args.format:
warnings.warn("Option `--format` is deprecated. Use `--file-format` instead.", DeprecationWarning)
@@ -199,7 +199,7 @@ alternative_conn_specs = ["conn_type", "conn_host", "conn_login", "conn_password
@cli_utils.action_cli
def connections_add(args):
- """Adds new connection"""
+ """Adds new connection."""
has_uri = bool(args.conn_uri)
has_json = bool(args.conn_json)
has_type = bool(args.conn_type)
@@ -284,7 +284,7 @@ def connections_add(args):
@cli_utils.action_cli
def connections_delete(args):
- """Deletes connection from DB"""
+ """Deletes connection from DB."""
with create_session() as session:
try:
to_delete = session.query(Connection).filter(Connection.conn_id == args.conn_id).one()
@@ -299,7 +299,7 @@ def connections_delete(args):
@cli_utils.action_cli(check_db=False)
def connections_import(args):
- """Imports connections from a file"""
+ """Imports connections from a file."""
if os.path.exists(args.file):
_import_helper(args.file)
else:
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index c21d08c08e..0465c474ed 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Dag sub-commands"""
+"""Dag sub-commands."""
from __future__ import annotations
import ast
@@ -49,7 +49,7 @@ log = logging.getLogger(__name__)
@cli_utils.action_cli
def dag_backfill(args, dag=None):
- """Creates backfill job or dry run for a DAG or list of DAGs using regex"""
+ """Creates backfill job or dry run for a DAG or list of DAGs using regex."""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
signal.signal(signal.SIGTERM, sigint_handler)
@@ -139,7 +139,7 @@ def dag_backfill(args, dag=None):
@cli_utils.action_cli
def dag_trigger(args):
- """Creates a dag run for the specified dag"""
+ """Creates a dag run for the specified dag."""
api_client = get_current_api_client()
try:
message = api_client.trigger_dag(
@@ -152,7 +152,7 @@ def dag_trigger(args):
@cli_utils.action_cli
def dag_delete(args):
- """Deletes all DB records related to the specified dag"""
+ """Deletes all DB records related to the specified dag."""
api_client = get_current_api_client()
if (
args.yes
@@ -170,18 +170,18 @@ def dag_delete(args):
@cli_utils.action_cli
def dag_pause(args):
- """Pauses a DAG"""
+ """Pauses a DAG."""
set_is_paused(True, args)
@cli_utils.action_cli
def dag_unpause(args):
- """Unpauses a DAG"""
+ """Unpauses a DAG."""
set_is_paused(False, args)
def set_is_paused(is_paused, args):
- """Sets is_paused for DAG by a given dag_id"""
+ """Sets is_paused for DAG by a given dag_id."""
dag = DagModel.get_dagmodel(args.dag_id)
if not dag:
@@ -193,7 +193,7 @@ def set_is_paused(is_paused, args):
def dag_dependencies_show(args):
- """Displays DAG dependencies, save to file or show as imgcat image"""
+ """Displays DAG dependencies, save to file or show as imgcat image."""
dot = render_dag_dependencies(SerializedDagModel.get_dag_dependencies())
filename = args.save
imgcat = args.imgcat
@@ -212,7 +212,7 @@ def dag_dependencies_show(args):
def dag_show(args):
- """Displays DAG or saves it's graphic representation to the file"""
+ """Displays DAG or saves it's graphic representation to the file."""
dag = get_dag(args.subdir, args.dag_id)
dot = render_dag(dag)
filename = args.save
@@ -323,7 +323,7 @@ def dag_next_execution(args):
@cli_utils.action_cli
@suppress_logs_and_warning
def dag_list_dags(args):
- """Displays dags with or without stats at the command line"""
+ """Displays dags with or without stats at the command line."""
dagbag = DagBag(process_subdir(args.subdir))
if dagbag.import_errors:
from rich import print as rich_print
@@ -348,7 +348,7 @@ def dag_list_dags(args):
@cli_utils.action_cli
@suppress_logs_and_warning
def dag_list_import_errors(args):
- """Displays dags with import errors on the command line"""
+ """Displays dags with import errors on the command line."""
dagbag = DagBag(process_subdir(args.subdir))
data = []
for filename, errors in dagbag.import_errors.items():
@@ -362,7 +362,7 @@ def dag_list_import_errors(args):
@cli_utils.action_cli
@suppress_logs_and_warning
def dag_report(args):
- """Displays dagbag stats at the command line"""
+ """Displays dagbag stats at the command line."""
dagbag = DagBag(process_subdir(args.subdir))
AirflowConsole().print_as(
data=dagbag.dagbag_stats,
@@ -381,7 +381,7 @@ def dag_report(args):
@suppress_logs_and_warning
@provide_session
def dag_list_jobs(args, dag=None, session=NEW_SESSION):
- """Lists latest n jobs"""
+ """Lists latest n jobs."""
queries = []
if dag:
args.dag_id = dag.dag_id
@@ -411,7 +411,7 @@ def dag_list_jobs(args, dag=None, session=NEW_SESSION):
@suppress_logs_and_warning
@provide_session
def dag_list_dag_runs(args, dag=None, session=NEW_SESSION):
- """Lists dag runs for a given DAG"""
+ """Lists dag runs for a given DAG."""
if dag:
args.dag_id = dag.dag_id
else:
@@ -484,6 +484,7 @@ def dag_test(args, dag=None, session=None):
@provide_session
@cli_utils.action_cli
def dag_reserialize(args, session: Session = NEW_SESSION):
+ """Serialize a DAG instance."""
session.query(SerializedDagModel).delete(synchronize_session=False)
if not args.clear_only:
diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py
index 4999a8dd8e..f8ce65663b 100644
--- a/airflow/cli/commands/dag_processor_command.py
+++ b/airflow/cli/commands/dag_processor_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""DagProcessor command"""
+"""DagProcessor command."""
from __future__ import annotations
import logging
@@ -47,7 +47,7 @@ def _create_dag_processor_manager(args) -> DagFileProcessorManager:
@cli_utils.action_cli
def dag_processor(args):
- """Starts Airflow Dag Processor Job"""
+ """Starts Airflow Dag Processor Job."""
if not conf.getboolean("scheduler", "standalone_dag_processor"):
raise SystemExit("The option [scheduler/standalone_dag_processor] must be True.")
diff --git a/airflow/cli/commands/db_command.py b/airflow/cli/commands/db_command.py
index 6f7a27e1c0..4064285db8 100644
--- a/airflow/cli/commands/db_command.py
+++ b/airflow/cli/commands/db_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Database sub-commands"""
+"""Database sub-commands."""
from __future__ import annotations
import os
@@ -32,14 +32,14 @@ from airflow.utils.process_utils import execute_interactive
def initdb(args):
- """Initializes the metadata database"""
+ """Initializes the metadata database."""
print("DB: " + repr(settings.engine.url))
db.initdb()
print("Initialization done")
def resetdb(args):
- """Resets the metadata database"""
+ """Resets the metadata database."""
print("DB: " + repr(settings.engine.url))
if not (args.yes or input("This will drop existing tables if they exist. Proceed? (y/n)").upper() == "Y"):
raise SystemExit("Cancelled")
@@ -48,7 +48,7 @@ def resetdb(args):
@cli_utils.action_cli(check_db=False)
def upgradedb(args):
- """Upgrades the metadata database"""
+ """Upgrades the metadata database."""
print("DB: " + repr(settings.engine.url))
if args.to_revision and args.to_version:
raise SystemExit("Cannot supply both `--to-revision` and `--to-version`.")
@@ -93,7 +93,7 @@ def upgradedb(args):
@cli_utils.action_cli(check_db=False)
def downgrade(args):
- """Downgrades the metadata database"""
+ """Downgrades the metadata database."""
if args.to_revision and args.to_version:
raise SystemExit("Cannot supply both `--to-revision` and `--to-version`.")
if args.from_version and args.from_revision:
@@ -139,13 +139,13 @@ def downgrade(args):
def check_migrations(args):
- """Function to wait for all airflow migrations to complete. Used for launching airflow in k8s"""
+ """Function to wait for all airflow migrations to complete. Used for launching airflow in k8s."""
db.check_migrations(timeout=args.migration_wait_timeout)
@cli_utils.action_cli(check_db=False)
def shell(args):
- """Run a shell that allows to access metadata database"""
+ """Run a shell that allows to access metadata database."""
url = settings.engine.url
print("DB: " + repr(url))
@@ -198,7 +198,7 @@ all_tables = sorted(config_dict)
@cli_utils.action_cli(check_db=False)
def cleanup_tables(args):
- """Purges old records in metadata database"""
+ """Purges old records in metadata database."""
run_cleanup(
table_names=args.tables,
dry_run=args.dry_run,
diff --git a/airflow/cli/commands/info_command.py b/airflow/cli/commands/info_command.py
index 3ab1e73439..124271c8c9 100644
--- a/airflow/cli/commands/info_command.py
+++ b/airflow/cli/commands/info_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Config sub-commands"""
+"""Config sub-commands."""
from __future__ import annotations
import locale
@@ -43,13 +43,13 @@ class Anonymizer(Protocol):
"""Anonymizer protocol."""
def process_path(self, value) -> str:
- """Remove pii from paths"""
+ """Remove pii from paths."""
def process_username(self, value) -> str:
- """Remove pii from username"""
+ """Remove pii from username."""
def process_url(self, value) -> str:
- """Remove pii from URL"""
+ """Remove pii from URL."""
class NullAnonymizer(Anonymizer):
@@ -125,7 +125,7 @@ class PiiAnonymizer(Anonymizer):
class OperatingSystem:
- """Operating system"""
+ """Operating system."""
WINDOWS = "Windows"
LINUX = "Linux"
@@ -134,7 +134,7 @@ class OperatingSystem:
@staticmethod
def get_current() -> str | None:
- """Get current operating system"""
+ """Get current operating system."""
if os.name == "nt":
return OperatingSystem.WINDOWS
elif "linux" in sys.platform:
@@ -147,7 +147,7 @@ class OperatingSystem:
class Architecture:
- """Compute architecture"""
+ """Compute architecture."""
X86_64 = "x86_64"
X86 = "x86"
@@ -156,7 +156,7 @@ class Architecture:
@staticmethod
def get_current():
- """Get architecture"""
+ """Get architecture."""
return _MACHINE_TO_ARCHITECTURE.get(platform.machine().lower())
@@ -180,7 +180,7 @@ _MACHINE_TO_ARCHITECTURE = {
class AirflowInfo:
- """Renders information about Airflow instance"""
+ """Renders information about Airflow instance."""
def __init__(self, anonymizer):
self.anonymizer = anonymizer
@@ -308,7 +308,7 @@ class AirflowInfo:
return [(p.data["package-name"], p.version) for p in ProvidersManager().providers.values()]
def show(self, output: str, console: AirflowConsole | None = None) -> None:
- """Shows information about Airflow instance"""
+ """Shows information about Airflow instance."""
all_info = {
"Apache Airflow": self._airflow_info,
"System info": self._system_info,
@@ -330,7 +330,7 @@ class AirflowInfo:
)
def render_text(self, output: str) -> str:
- """Exports the info to string"""
+ """Exports the info to string."""
console = AirflowConsole(record=True)
with console.capture():
self.show(output=output, console=console)
@@ -338,7 +338,7 @@ class AirflowInfo:
class FileIoException(Exception):
- """Raises when error happens in FileIo.io integration"""
+ """Raises when error happens in FileIo.io integration."""
@tenacity.retry(
@@ -349,7 +349,7 @@ class FileIoException(Exception):
after=tenacity.after_log(log, logging.DEBUG),
)
def _upload_text_to_fileio(content):
- """Upload text file to File.io service and return lnk"""
+ """Upload text file to File.io service and return link."""
resp = httpx.post("https://file.io", content=content)
if resp.status_code not in [200, 201]:
print(resp.json())
diff --git a/airflow/cli/commands/jobs_command.py b/airflow/cli/commands/jobs_command.py
index dbb9ebe5c5..c030b5ea9b 100644
--- a/airflow/cli/commands/jobs_command.py
+++ b/airflow/cli/commands/jobs_command.py
@@ -24,7 +24,7 @@ from airflow.utils.state import State
@provide_session
def check(args, session=None):
- """Checks if job(s) are still alive"""
+ """Checks if job(s) are still alive."""
if args.allow_multiple and not args.limit > 1:
raise SystemExit("To use option --allow-multiple, you must set the limit to a value greater than 1.")
if args.hostname and args.local:
diff --git a/airflow/cli/commands/kerberos_command.py b/airflow/cli/commands/kerberos_command.py
index f3d7263e19..4bbe3f6df9 100644
--- a/airflow/cli/commands/kerberos_command.py
+++ b/airflow/cli/commands/kerberos_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Kerberos command"""
+"""Kerberos command."""
from __future__ import annotations
import daemon
@@ -28,7 +28,7 @@ from airflow.utils.cli import setup_locations
@cli_utils.action_cli
def kerberos(args):
- """Start a kerberos ticket renewer"""
+ """Start a kerberos ticket renewer."""
print(settings.HEADER)
if args.daemon:
diff --git a/airflow/cli/commands/kubernetes_command.py b/airflow/cli/commands/kubernetes_command.py
index 371d02e33e..052bf339c0 100644
--- a/airflow/cli/commands/kubernetes_command.py
+++ b/airflow/cli/commands/kubernetes_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Kubernetes sub-commands"""
+"""Kubernetes sub-commands."""
from __future__ import annotations
import os
@@ -37,7 +37,7 @@ from airflow.utils.cli import get_dag
@cli_utils.action_cli
def generate_pod_yaml(args):
- """Generates yaml files for each task in the DAG. Used for testing output of KubernetesExecutor"""
+ """Generates yaml files for each task in the DAG. Used for testing output of KubernetesExecutor."""
execution_date = args.execution_date
dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
yaml_output_path = args.output_path
@@ -72,7 +72,7 @@ def generate_pod_yaml(args):
@cli_utils.action_cli
def cleanup_pods(args):
- """Clean up k8s pods in evicted/failed/succeeded/pending states"""
+ """Clean up k8s pods in evicted/failed/succeeded/pending states."""
namespace = args.namespace
min_pending_minutes = args.min_pending_minutes
@@ -148,7 +148,7 @@ def cleanup_pods(args):
def _delete_pod(name, namespace):
- """Helper Function for cleanup_pods"""
+ """Helper Function for cleanup_pods."""
core_v1 = client.CoreV1Api()
delete_options = client.V1DeleteOptions()
print(f'Deleting POD "{name}" from "{namespace}" namespace')
diff --git a/airflow/cli/commands/legacy_commands.py b/airflow/cli/commands/legacy_commands.py
index 5643879dc5..910c6e4427 100644
--- a/airflow/cli/commands/legacy_commands.py
+++ b/airflow/cli/commands/legacy_commands.py
@@ -50,7 +50,7 @@ COMMAND_MAP = {
def check_legacy_command(action, value):
- """Checks command value and raise error if value is in removed command"""
+ """Checks command value and raise error if value is in removed command."""
new_command = COMMAND_MAP.get(value)
if new_command is not None:
msg = f"`airflow {value}` command, has been removed, please use `airflow {new_command}`"
diff --git a/airflow/cli/commands/plugins_command.py b/airflow/cli/commands/plugins_command.py
index 2c9f1488c4..50ee583099 100644
--- a/airflow/cli/commands/plugins_command.py
+++ b/airflow/cli/commands/plugins_command.py
@@ -40,7 +40,7 @@ def _join_plugins_names(value: list[Any] | Any) -> str:
@suppress_logs_and_warning
def dump_plugins(args):
- """Dump plugins information"""
+ """Dump plugins information."""
plugins_info: list[dict[str, str]] = get_plugin_info()
if not plugins_manager.plugins:
print("No plugins loaded")
diff --git a/airflow/cli/commands/pool_command.py b/airflow/cli/commands/pool_command.py
index 0dfa86ab07..aa56ba8fea 100644
--- a/airflow/cli/commands/pool_command.py
+++ b/airflow/cli/commands/pool_command.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Pools sub-commands"""
+"""Pools sub-commands."""
from __future__ import annotations
import json
@@ -43,7 +43,7 @@ def _show_pools(pools, output):
@suppress_logs_and_warning
def pool_list(args):
- """Displays info of all the pools"""
+ """Displays info of all the pools."""
api_client = get_current_api_client()
pools = api_client.get_pools()
_show_pools(pools=pools, output=args.output)
@@ -51,7 +51,7 @@ def pool_list(args):
@suppress_logs_and_warning
def pool_get(args):
- """Displays pool info by a given name"""
+ """Displays pool info by a given name."""
api_client = get_current_api_client()
try:
pools = [api_client.get_pool(name=args.pool)]
@@ -63,7 +63,7 @@ def pool_get(args):
@cli_utils.action_cli
@suppress_logs_and_warning
def pool_set(args):
- """Creates new pool with a given name and slots"""
+ """Creates new pool with a given name and slots."""
api_client = get_current_api_client()
api_client.create_pool(name=args.pool, slots=args.slots, description=args.description)
print(f"Pool {args.pool} created")
@@ -72,7 +72,7 @@ def pool_set(args):
@cli_utils.action_cli
@suppress_logs_and_warning
def pool_delete(args):
- """Deletes pool by a given name"""
+ """Deletes pool by a given name."""
api_client = get_current_api_client()
try:
api_client.delete_pool(name=args.pool)
@@ -84,7 +84,7 @@ def pool_delete(args):
@cli_utils.action_cli
@suppress_logs_and_warning
def pool_import(args):
- """Imports pools from the file"""
+ """Imports pools from the file."""
if not os.path.exists(args.file):
raise SystemExit(f"Missing pools file {args.file}")
pools, failed = pool_import_helper(args.file)
@@ -94,13 +94,13 @@ def pool_import(args):
def pool_export(args):
- """Exports all of the pools to the file"""
+ """Exports all the pools to the file."""
pools = pool_export_helper(args.file)
print(f"Exported {len(pools)} pools to {args.file}")
def pool_import_helper(filepath):
- """Helps import pools from the json file"""
+ """Helps import pools from the json file."""
api_client = get_current_api_client()
with open(filepath) as poolfile:
@@ -120,7 +120,7 @@ def pool_import_helper(filepath):
def pool_export_helper(filepath):
- """Helps export all of the pools to the json file"""
+ """Helps export all the pools to the json file."""
api_client = get_current_api_client()
pool_dict = {}
pools = api_client.get_pools()
diff --git a/airflow/cli/commands/provider_command.py b/airflow/cli/commands/provider_command.py
index bb27467745..cae6056794 100644
--- a/airflow/cli/commands/provider_command.py
+++ b/airflow/cli/commands/provider_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Providers sub-commands"""
+"""Providers sub-commands."""
from __future__ import annotations
import re
@@ -52,7 +52,7 @@ def provider_get(args):
@suppress_logs_and_warning
def providers_list(args):
- """Lists all providers at the command line"""
+ """Lists all providers at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().providers.values()),
output=args.output,
@@ -66,7 +66,7 @@ def providers_list(args):
@suppress_logs_and_warning
def hooks_list(args):
- """Lists all hooks at the command line"""
+ """Lists all hooks at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().hooks.items()),
output=args.output,
@@ -82,7 +82,7 @@ def hooks_list(args):
@suppress_logs_and_warning
def connection_form_widget_list(args):
- """Lists all custom connection form fields at the command line"""
+ """Lists all custom connection form fields at the command line."""
AirflowConsole().print_as(
data=list(sorted(ProvidersManager().connection_form_widgets.items())),
output=args.output,
@@ -97,7 +97,7 @@ def connection_form_widget_list(args):
@suppress_logs_and_warning
def connection_field_behaviours(args):
- """Lists field behaviours"""
+ """Lists field behaviours."""
AirflowConsole().print_as(
data=list(ProvidersManager().field_behaviours.keys()),
output=args.output,
@@ -109,7 +109,7 @@ def connection_field_behaviours(args):
@suppress_logs_and_warning
def extra_links_list(args):
- """Lists all extra links at the command line"""
+ """Lists all extra links at the command line."""
AirflowConsole().print_as(
data=ProvidersManager().extra_links_class_names,
output=args.output,
@@ -121,7 +121,7 @@ def extra_links_list(args):
@suppress_logs_and_warning
def logging_list(args):
- """Lists all log task handlers at the command line"""
+ """Lists all log task handlers at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().logging_class_names),
output=args.output,
@@ -133,7 +133,7 @@ def logging_list(args):
@suppress_logs_and_warning
def secrets_backends_list(args):
- """Lists all secrets backends at the command line"""
+ """Lists all secrets backends at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().secrets_backend_class_names),
output=args.output,
@@ -145,7 +145,7 @@ def secrets_backends_list(args):
@suppress_logs_and_warning
def auth_backend_list(args):
- """Lists all API auth backend modules at the command line"""
+ """Lists all API auth backend modules at the command line."""
AirflowConsole().print_as(
data=list(ProvidersManager().auth_backend_module_names),
output=args.output,
diff --git a/airflow/cli/commands/role_command.py b/airflow/cli/commands/role_command.py
index 946bdc3de8..571db3eefe 100644
--- a/airflow/cli/commands/role_command.py
+++ b/airflow/cli/commands/role_command.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Roles sub-commands"""
+"""Roles sub-commands."""
from __future__ import annotations
import collections
@@ -33,7 +33,7 @@ from airflow.www.security import EXISTING_ROLES, AirflowSecurityManager
@suppress_logs_and_warning
def roles_list(args):
- """Lists all existing roles"""
+ """Lists all existing roles."""
appbuilder = cached_app().appbuilder
roles = appbuilder.sm.get_all_roles()
@@ -58,7 +58,7 @@ def roles_list(args):
@cli_utils.action_cli
@suppress_logs_and_warning
def roles_create(args):
- """Creates new empty role in DB"""
+ """Creates new empty role in DB."""
appbuilder = cached_app().appbuilder
for role_name in args.role:
appbuilder.sm.add_role(role_name)
@@ -68,7 +68,7 @@ def roles_create(args):
@cli_utils.action_cli
@suppress_logs_and_warning
def roles_delete(args):
- """Deletes role in DB"""
+ """Deletes role in DB."""
appbuilder = cached_app().appbuilder
for role_name in args.role:
@@ -133,21 +133,22 @@ def __roles_add_or_remove_permissions(args):
@cli_utils.action_cli
@suppress_logs_and_warning
def roles_add_perms(args):
- """Adds permissions to role in DB"""
+ """Adds permissions to role in DB."""
__roles_add_or_remove_permissions(args)
@cli_utils.action_cli
@suppress_logs_and_warning
def roles_del_perms(args):
- """Deletes permissions from role in DB"""
+ """Deletes permissions from role in DB."""
__roles_add_or_remove_permissions(args)
@suppress_logs_and_warning
def roles_export(args):
"""
- Exports all the roles from the data base to a file.
+ Exports all the roles from the database to a file.
+
Note, this function does not export the permissions associated for each role.
Strictly, it exports the role names into the passed role json file.
"""
@@ -166,6 +167,7 @@ def roles_export(args):
def roles_import(args):
"""
Import all the roles into the db from the given json file.
+
Note, this function does not import the permissions for different roles and import them as well.
Strictly, it imports the role names in the role json file passed.
"""
diff --git a/airflow/cli/commands/rotate_fernet_key_command.py b/airflow/cli/commands/rotate_fernet_key_command.py
index ca7e956587..f9e1873597 100644
--- a/airflow/cli/commands/rotate_fernet_key_command.py
+++ b/airflow/cli/commands/rotate_fernet_key_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Rotate Fernet key command"""
+"""Rotate Fernet key command."""
from __future__ import annotations
from airflow.models import Connection, Variable
@@ -24,7 +24,7 @@ from airflow.utils.session import create_session
@cli_utils.action_cli
def rotate_fernet_key(args):
- """Rotates all encrypted connection credentials and variables"""
+ """Rotates all encrypted connection credentials and variables."""
with create_session() as session:
for conn in session.query(Connection).filter(Connection.is_encrypted | Connection.is_extra_encrypted):
conn.rotate_fernet_key()
diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py
index d6c3fb394a..a82ada1d52 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Scheduler command"""
+"""Scheduler command."""
from __future__ import annotations
import signal
@@ -57,7 +57,7 @@ def _run_scheduler_job(args):
@cli_utils.action_cli
def scheduler(args):
- """Starts Airflow Scheduler"""
+ """Starts Airflow Scheduler."""
print(settings.HEADER)
if args.daemon:
@@ -86,7 +86,7 @@ def scheduler(args):
def _serve_logs(skip_serve_logs: bool = False) -> Process | None:
- """Starts serve_logs sub-process"""
+ """Starts serve_logs sub-process."""
from airflow.configuration import conf
from airflow.utils.serve_logs import serve_logs
@@ -99,7 +99,7 @@ def _serve_logs(skip_serve_logs: bool = False) -> Process | None:
def _serve_health_check(enable_health_check: bool = False) -> Process | None:
- """Starts serve_health_check sub-process"""
+ """Starts serve_health_check sub-process."""
if enable_health_check:
sub_proc = Process(target=serve_health_check)
sub_proc.start()
diff --git a/airflow/cli/commands/standalone_command.py b/airflow/cli/commands/standalone_command.py
index b2d79f222c..2a5670e83f 100644
--- a/airflow/cli/commands/standalone_command.py
+++ b/airflow/cli/commands/standalone_command.py
@@ -55,7 +55,7 @@ class StandaloneCommand:
self.ready_delay = 3
def run(self):
- """Main run loop"""
+ """Main run loop."""
self.print_output("standalone", "Starting Airflow Standalone")
# Silence built-in logging at INFO
logging.getLogger("").setLevel(logging.WARNING)
@@ -116,7 +116,7 @@ class StandaloneCommand:
self.print_output("standalone", "Complete")
def update_output(self):
- """Drains the output queue and prints its contents to the screen"""
+ """Drains the output queue and prints its contents to the screen."""
while self.output_queue:
# Extract info
name, line = self.output_queue.popleft()
@@ -126,8 +126,9 @@ class StandaloneCommand:
def print_output(self, name: str, output):
"""
- Prints an output line with name and colouring. You can pass multiple
- lines to output if you wish; it will be split for you.
+ Prints an output line with name and colouring.
+
+ You can pass multiple lines to output if you wish; it will be split for you.
"""
color = {
"webserver": "green",
@@ -141,14 +142,16 @@ class StandaloneCommand:
def print_error(self, name: str, output):
"""
- Prints an error message to the console (this is the same as
- print_output but with the text red)
+ Prints an error message to the console.
+
+ This is the same as print_output but with the text red
"""
self.print_output(name, colored(output, "red"))
def calculate_env(self):
"""
Works out the environment variables needed to run subprocesses.
+
We override some settings as part of being standalone.
"""
env = dict(os.environ)
@@ -217,7 +220,8 @@ class StandaloneCommand:
def port_open(self, port):
"""
Checks if the given port is listening on the local machine.
- (used to tell if webserver is alive)
+
+ Used to tell if webserver is alive.
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -231,8 +235,9 @@ class StandaloneCommand:
def job_running(self, job):
"""
- Checks if the given job name is running and heartbeating correctly
- (used to tell if scheduler is alive)
+ Checks if the given job name is running and heartbeating correctly.
+
+ Used to tell if scheduler is alive.
"""
recent = job.most_recent_job()
if not recent:
@@ -241,8 +246,9 @@ class StandaloneCommand:
def print_ready(self):
"""
- Prints the banner shown when Airflow is ready to go, with login
- details.
+ Prints the banner shown when Airflow is ready to go.
+
+ Include with login details.
"""
self.print_output("standalone", "")
self.print_output("standalone", "Airflow is ready")
@@ -260,6 +266,8 @@ class StandaloneCommand:
class SubCommand(threading.Thread):
"""
+ Execute a subcommand on another thread.
+
Thread that launches a process and then streams its output back to the main
command. We use threads to avoid using select() and raw filehandles, and the
complex logic that brings doing line buffering.
@@ -273,7 +281,7 @@ class SubCommand(threading.Thread):
self.env = env
def run(self):
- """Runs the actual process and captures it output to a queue"""
+ """Runs the actual process and captures it output to a queue."""
self.process = subprocess.Popen(
["airflow"] + self.command,
stdout=subprocess.PIPE,
@@ -284,7 +292,7 @@ class SubCommand(threading.Thread):
self.parent.output_queue.append((self.name, line))
def stop(self):
- """Call to stop this process (and thus this thread)"""
+ """Call to stop this process (and thus this thread)."""
self.process.terminate()
diff --git a/airflow/cli/commands/sync_perm_command.py b/airflow/cli/commands/sync_perm_command.py
index 343a02f016..6a92ef99d5 100644
--- a/airflow/cli/commands/sync_perm_command.py
+++ b/airflow/cli/commands/sync_perm_command.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Sync permission command"""
+"""Sync permission command."""
from __future__ import annotations
from airflow.utils import cli as cli_utils
@@ -24,7 +24,7 @@ from airflow.www.app import cached_app
@cli_utils.action_cli
def sync_perm(args):
- """Updates permissions for existing roles and DAGs"""
+ """Updates permissions for existing roles and DAGs."""
appbuilder = cached_app().appbuilder
print("Updating actions and resources for all existing roles")
# Add missing permissions for all the Base Views _before_ syncing/creating roles
diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index bacab26aef..aaa4313a65 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Task sub-commands"""
+"""Task sub-commands."""
from __future__ import annotations
import datetime
@@ -146,7 +146,7 @@ def _get_ti(
create_if_necessary: CreateIfNecessary = False,
session: Session = NEW_SESSION,
) -> tuple[TaskInstance, bool]:
- """Get the task instance through DagRun.run_id, if that fails, get the TI the old way"""
+ """Get the task instance through DagRun.run_id, if that fails, get the TI the old way."""
if not exec_date_or_run_id and not create_if_necessary:
raise ValueError("Must provide `exec_date_or_run_id` if not `create_if_necessary`.")
if task.is_mapped:
@@ -179,7 +179,9 @@ def _get_ti(
def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None:
"""
- Runs the task in one of 3 modes
+ Runs the task based on a mode.
+
+ Any of the 3 modes are available:
- using LocalTaskJob
- as raw task
@@ -195,8 +197,9 @@ def _run_task_by_selected_method(args, dag: DAG, ti: TaskInstance) -> None:
def _run_task_by_executor(args, dag, ti):
"""
- Sends the task to the executor for execution. This can result in the task being started by another host
- if the executor implementation does
+ Sends the task to the executor for execution.
+
+ This can result in the task being started by another host if the executor implementation does.
"""
pickle_id = None
if args.ship_dag:
@@ -231,7 +234,7 @@ def _run_task_by_executor(args, dag, ti):
def _run_task_by_local_task_job(args, ti):
- """Run LocalTaskJob, which monitors the raw task execution process"""
+ """Run LocalTaskJob, which monitors the raw task execution process."""
run_job = LocalTaskJob(
task_instance=ti,
mark_success=args.mark_success,
@@ -260,7 +263,7 @@ RAW_TASK_UNSUPPORTED_OPTION = [
def _run_raw_task(args, ti: TaskInstance) -> None:
- """Runs the main task handling code"""
+ """Runs the main task handling code."""
ti._run_raw_task(
mark_success=args.mark_success,
job_id=args.job_id,
@@ -276,7 +279,8 @@ def _extract_external_executor_id(args) -> str | None:
@contextmanager
def _capture_task_logs(ti: TaskInstance) -> Generator[None, None, None]:
- """Manage logging context for a task run
+ """
+ Manage logging context for a task run.
- Replace the root logger configuration with the airflow.task configuration
so we can capture logs from any custom loggers used in the task.
@@ -311,7 +315,8 @@ def _capture_task_logs(ti: TaskInstance) -> Generator[None, None, None]:
@cli_utils.action_cli(check_db=False)
def task_run(args, dag=None):
- """Run a single task instance.
+ """
+ Run a single task instance.
Note that there must be at least one DagRun for this to start,
i.e. it must have been scheduled and/or triggered previously.
@@ -385,6 +390,8 @@ def task_run(args, dag=None):
@cli_utils.action_cli(check_db=False)
def task_failed_deps(args):
"""
+ Get task instance dependencies that were not met.
+
Returns the unmet dependencies for a task instance from the perspective of the
scheduler (i.e. why a task instance doesn't get scheduled and then queued by the
scheduler, and then run by an executor).
@@ -426,7 +433,7 @@ def task_state(args):
@cli_utils.action_cli(check_db=False)
@suppress_logs_and_warning
def task_list(args, dag=None):
- """Lists the tasks within a DAG at the command line"""
+ """Lists the tasks within a DAG at the command line."""
dag = dag or get_dag(args.subdir, args.dag_id)
if args.tree:
dag.tree_view()
@@ -445,8 +452,9 @@ SUPPORTED_DEBUGGER_MODULES: list[str] = [
def _guess_debugger():
"""
- Trying to guess the debugger used by the user. When it doesn't find any user-installed debugger,
- returns ``pdb``.
+ Trying to guess the debugger used by the user.
+
+ When it doesn't find any user-installed debugger, returns ``pdb``.
List of supported debuggers:
@@ -467,7 +475,7 @@ def _guess_debugger():
@suppress_logs_and_warning
@provide_session
def task_states_for_dag_run(args, session=None):
- """Get the status of all task instances in a DagRun"""
+ """Get the status of all task instances in a DagRun."""
dag_run = (
session.query(DagRun)
.filter(DagRun.run_id == args.execution_date_or_run_id, DagRun.dag_id == args.dag_id)
@@ -510,7 +518,7 @@ def task_states_for_dag_run(args, session=None):
@cli_utils.action_cli(check_db=False)
def task_test(args, dag=None):
- """Tests task for a given dag_id"""
+ """Tests task for a given dag_id."""
# We want to log output from operators etc to show up here. Normally
# airflow.task would redirect to a file, but here we want it to propagate
# up to the normal airflow handler.
@@ -571,7 +579,7 @@ def task_test(args, dag=None):
@cli_utils.action_cli(check_db=False)
@suppress_logs_and_warning
def task_render(args):
- """Renders and displays templated fields for a given task"""
+ """Renders and displays templated fields for a given task."""
dag = get_dag(args.subdir, args.dag_id)
task = dag.get_task(task_id=args.task_id)
ti, _ = _get_ti(
@@ -592,7 +600,7 @@ def task_render(args):
@cli_utils.action_cli(check_db=False)
def task_clear(args):
- """Clears all task instances or only those matched by regex for a DAG(s)"""
+ """Clears all task instances or only those matched by regex for a DAG(s)."""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
if args.dag_id and not args.subdir and not args.dag_regex and not args.task_regex:
diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py
index cfe30bfb35..64755f3830 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Triggerer command"""
+"""Triggerer command."""
from __future__ import annotations
import signal
@@ -30,7 +30,7 @@ from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, si
@cli_utils.action_cli
def triggerer(args):
- """Starts Airflow Triggerer"""
+ """Starts Airflow Triggerer."""
settings.MASK_SECRETS_IN_LOGS = True
print(settings.HEADER)
job = TriggererJob(capacity=args.capacity)
diff --git a/airflow/cli/commands/user_command.py b/airflow/cli/commands/user_command.py
index 62ed4954ce..f1c806e942 100644
--- a/airflow/cli/commands/user_command.py
+++ b/airflow/cli/commands/user_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""User sub-commands"""
+"""User sub-commands."""
from __future__ import annotations
import functools
@@ -36,7 +36,7 @@ from airflow.www.app import cached_app
class UserSchema(Schema):
- """user collection item schema"""
+ """user collection item schema."""
id = fields.Int()
firstname = fields.Str(required=True)
@@ -48,7 +48,7 @@ class UserSchema(Schema):
@suppress_logs_and_warning
def users_list(args):
- """Lists users at the command line"""
+ """Lists users at the command line."""
appbuilder = cached_app().appbuilder
users = appbuilder.sm.get_all_users()
fields = ["id", "username", "email", "first_name", "last_name", "roles"]
@@ -60,7 +60,7 @@ def users_list(args):
@cli_utils.action_cli(check_db=True)
def users_create(args):
- """Creates new user in the DB"""
+ """Creates new user in the DB."""
appbuilder = cached_app().appbuilder
role = appbuilder.sm.find_role(args.role)
if not role:
@@ -104,7 +104,7 @@ def _find_user(args):
@cli_utils.action_cli
def users_delete(args):
- """Deletes user from DB"""
+ """Deletes user from DB."""
user = _find_user(args)
appbuilder = cached_app().appbuilder
@@ -117,7 +117,7 @@ def users_delete(args):
@cli_utils.action_cli
def users_manage_role(args, remove=False):
- """Deletes or appends user roles"""
+ """Deletes or appends user roles."""
user = _find_user(args)
appbuilder = cached_app().appbuilder
@@ -144,7 +144,7 @@ def users_manage_role(args, remove=False):
def users_export(args):
- """Exports all users to the json file"""
+ """Exports all users to the json file."""
appbuilder = cached_app().appbuilder
users = appbuilder.sm.get_all_users()
fields = ["id", "username", "email", "first_name", "last_name", "roles"]
@@ -171,7 +171,7 @@ def users_export(args):
@cli_utils.action_cli
def users_import(args):
- """Imports users from the json file"""
+ """Imports users from the json file."""
json_file = getattr(args, "import")
if not os.path.exists(json_file):
raise SystemExit(f"File '{json_file}' does not exist")
diff --git a/airflow/cli/commands/variable_command.py b/airflow/cli/commands/variable_command.py
index e3b472b343..009b4704aa 100644
--- a/airflow/cli/commands/variable_command.py
+++ b/airflow/cli/commands/variable_command.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Variable subcommands"""
+"""Variable subcommands."""
from __future__ import annotations
import json
@@ -31,7 +31,7 @@ from airflow.utils.session import create_session
@suppress_logs_and_warning
def variables_list(args):
- """Displays all of the variables"""
+ """Displays all the variables."""
with create_session() as session:
variables = session.query(Variable)
AirflowConsole().print_as(data=variables, output=args.output, mapper=lambda x: {"key": x.key})
@@ -39,7 +39,7 @@ def variables_list(args):
@suppress_logs_and_warning
def variables_get(args):
- """Displays variable by a given name"""
+ """Displays variable by a given name."""
try:
if args.default is None:
var = Variable.get(args.key, deserialize_json=args.json)
@@ -53,21 +53,21 @@ def variables_get(args):
@cli_utils.action_cli
def variables_set(args):
- """Creates new variable with a given name and value"""
+ """Creates new variable with a given name and value."""
Variable.set(args.key, args.value, serialize_json=args.json)
print(f"Variable {args.key} created")
@cli_utils.action_cli
def variables_delete(args):
- """Deletes variable by a given name"""
+ """Deletes variable by a given name."""
Variable.delete(args.key)
print(f"Variable {args.key} deleted")
@cli_utils.action_cli
def variables_import(args):
- """Imports variables from a given file"""
+ """Imports variables from a given file."""
if os.path.exists(args.file):
_import_helper(args.file)
else:
@@ -75,12 +75,12 @@ def variables_import(args):
def variables_export(args):
- """Exports all of the variables to the file"""
+ """Exports all the variables to the file."""
_variable_export_helper(args.file)
def _import_helper(filepath):
- """Helps import variables from the file"""
+ """Helps import variables from the file."""
with open(filepath) as varfile:
data = varfile.read()
@@ -104,7 +104,7 @@ def _import_helper(filepath):
def _variable_export_helper(filepath):
- """Helps export all of the variables to the file"""
+ """Helps export all the variables to the file."""
var_dict = {}
with create_session() as session:
qry = session.query(Variable).all()
diff --git a/airflow/cli/commands/version_command.py b/airflow/cli/commands/version_command.py
index 365e9b2316..d3b735951c 100644
--- a/airflow/cli/commands/version_command.py
+++ b/airflow/cli/commands/version_command.py
@@ -14,12 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Version command"""
+"""Version command."""
from __future__ import annotations
import airflow
def version(args):
- """Displays Airflow version at the command line"""
+ """Displays Airflow version at the command line."""
print(airflow.__version__)
diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py
index 88e3c51384..a14f6a38e7 100644
--- a/airflow/cli/commands/webserver_command.py
+++ b/airflow/cli/commands/webserver_command.py
@@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Webserver command"""
+"""Webserver command."""
from __future__ import annotations
import hashlib
@@ -47,9 +47,10 @@ log = logging.getLogger(__name__)
class GunicornMonitor(LoggingMixin):
"""
- Runs forever, monitoring the child processes of @gunicorn_master_proc and
- restarting workers occasionally or when files in the plug-in directory
- has been modified.
+ Runs forever.
+
+ Monitoring the child processes of @gunicorn_master_proc and restarting
+ workers occasionally or when files in the plug-in directory has been modified.
Each iteration of the loop traverses one edge of this state transition
diagram, where each state (node) represents
@@ -105,6 +106,8 @@ class GunicornMonitor(LoggingMixin):
def _generate_plugin_state(self) -> dict[str, float]:
"""
+ Get plugin states.
+
Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
directory.
"""
@@ -119,7 +122,7 @@ class GunicornMonitor(LoggingMixin):
@staticmethod
def _get_file_hash(fname: str):
- """Calculate MD5 hash for file"""
+ """Calculate MD5 hash for file."""
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
@@ -127,7 +130,7 @@ class GunicornMonitor(LoggingMixin):
return hash_md5.hexdigest()
def _get_num_ready_workers_running(self) -> int:
- """Returns number of ready Gunicorn workers by looking for READY_PREFIX in process name"""
+ """Returns number of ready Gunicorn workers by looking for READY_PREFIX in process name."""
workers = psutil.Process(self.gunicorn_master_proc.pid).children()
def ready_prefix_on_cmdline(proc):
@@ -143,12 +146,12 @@ class GunicornMonitor(LoggingMixin):
return len(ready_workers)
def _get_num_workers_running(self) -> int:
- """Returns number of running Gunicorn workers processes"""
+ """Returns number of running Gunicorn workers processes."""
workers = psutil.Process(self.gunicorn_master_proc.pid).children()
return len(workers)
def _wait_until_true(self, fn, timeout: int = 0) -> None:
- """Sleeps until fn is true"""
+ """Sleeps until fn is true."""
start_time = time.monotonic()
while not fn():
if 0 < timeout <= time.monotonic() - start_time:
@@ -188,8 +191,10 @@ class GunicornMonitor(LoggingMixin):
def _reload_gunicorn(self) -> None:
"""
- Send signal to reload the gunicorn configuration. When gunicorn receive signals, it reload the
- configuration, start the new worker processes with a new configuration and gracefully
+ Send signal to reload the gunicorn configuration.
+
+ When gunicorn receive signals, it reloads the configuration,
+ start the new worker processes with a new configuration and gracefully
shutdown older workers.
"""
# HUP: Reload the configuration.
@@ -315,7 +320,7 @@ class GunicornMonitor(LoggingMixin):
@cli_utils.action_cli
def webserver(args):
- """Starts Airflow Webserver"""
+ """Starts Airflow Webserver."""
print(settings.HEADER)
# Check for old/insecure config, and fail safe (i.e. don't launch) if the config is wildly insecure.
diff --git a/airflow/cli/simple_table.py b/airflow/cli/simple_table.py
index 7c372ea1f2..87f7ce9f3b 100644
--- a/airflow/cli/simple_table.py
+++ b/airflow/cli/simple_table.py
@@ -32,7 +32,7 @@ from airflow.utils.platform import is_tty
class AirflowConsole(Console):
- """Airflow rich console"""
+ """Airflow rich console."""
def __init__(self, show_header: bool = True, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -43,17 +43,17 @@ class AirflowConsole(Console):
self.show_header = show_header
def print_as_json(self, data: dict):
- """Renders dict as json text representation"""
+ """Renders dict as json text representation."""
json_content = json.dumps(data)
self.print(Syntax(json_content, "json", theme="ansi_dark"), soft_wrap=True)
def print_as_yaml(self, data: dict):
- """Renders dict as yaml text representation"""
+ """Renders dict as yaml text representation."""
yaml_content = yaml.dump(data)
self.print(Syntax(yaml_content, "yaml", theme="ansi_dark"), soft_wrap=True)
def print_as_table(self, data: list[dict]):
- """Renders list of dictionaries as table"""
+ """Renders list of dictionaries as table."""
if not data:
self.print("No data found")
return
@@ -67,7 +67,7 @@ class AirflowConsole(Console):
self.print(table)
def print_as_plain_table(self, data: list[dict]):
- """Renders list of dictionaries as a simple table than can be easily piped"""
+ """Renders list of dictionaries as a simple table than can be easily piped."""
if not data:
self.print("No data found")
return
@@ -89,7 +89,7 @@ class AirflowConsole(Console):
return str(value)
def print_as(self, data: list[dict | Any], output: str, mapper: Callable | None = None):
- """Prints provided using format specified by output argument"""
+ """Prints provided using format specified by output argument."""
output_to_renderer: dict[str, Callable[[Any], None]] = {
"json": self.print_as_json,
"yaml": self.print_as_yaml,
@@ -127,6 +127,6 @@ class SimpleTable(Table):
self.caption = kwargs.get("caption", " ")
def add_column(self, *args, **kwargs) -> None:
- """Add a column to the table. We use different default"""
+ """Add a column to the table. We use different default."""
kwargs["overflow"] = kwargs.get("overflow") # to avoid truncating
super().add_column(*args, **kwargs)