You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/03 09:32:26 UTC
[airflow] 02/41: Switch to f-strings using flynt. (#13732)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 95889128afec4ebb8a3dfe8a8e13a7264d1a3bd6
Author: Joshua Carp <jm...@gmail.com>
AuthorDate: Sat Jan 23 00:19:38 2021 -0500
Switch to f-strings using flynt. (#13732)
(cherry picked from commit a9ac2b040b64de1aa5d9c2b9def33334e36a8d22)
---
.pre-commit-config.yaml | 7 ++++
BREEZE.rst | 2 +-
STATIC_CODE_CHECKS.rst | 2 +
airflow/api/common/experimental/get_code.py | 2 +-
airflow/api/common/experimental/pool.py | 6 +--
.../api_connexion/endpoints/connection_endpoint.py | 2 +-
airflow/cli/commands/dag_command.py | 2 +-
airflow/cli/commands/task_command.py | 11 ++----
airflow/cli/commands/user_command.py | 8 ++--
airflow/cli/commands/variable_command.py | 2 +-
airflow/configuration.py | 2 +-
.../example_passing_params_via_test_command.py | 6 +--
airflow/example_dags/example_trigger_target_dag.py | 2 +-
airflow/example_dags/subdags/subdag.py | 2 +-
airflow/example_dags/tutorial_taskflow_api_etl.py | 2 +-
airflow/hooks/dbapi.py | 2 +-
airflow/kubernetes/refresh_config.py | 2 +-
airflow/models/connection.py | 6 +--
airflow/models/dag.py | 2 +-
airflow/models/taskinstance.py | 4 +-
airflow/models/xcom.py | 4 +-
airflow/operators/sql.py | 14 +++----
airflow/providers/amazon/aws/hooks/datasync.py | 4 +-
airflow/providers/amazon/aws/hooks/dynamodb.py | 4 +-
airflow/providers/amazon/aws/hooks/sagemaker.py | 4 +-
.../providers/amazon/aws/log/s3_task_handler.py | 2 +-
airflow/providers/amazon/aws/operators/datasync.py | 2 +-
.../amazon/aws/operators/emr_add_steps.py | 2 +-
.../amazon/aws/operators/emr_create_job_flow.py | 2 +-
.../amazon/aws/operators/emr_modify_cluster.py | 2 +-
.../amazon/aws/operators/emr_terminate_job_flow.py | 2 +-
.../amazon/aws/operators/sagemaker_endpoint.py | 2 +-
.../aws/operators/sagemaker_endpoint_config.py | 2 +-
.../amazon/aws/operators/sagemaker_model.py | 2 +-
.../amazon/aws/operators/sagemaker_processing.py | 2 +-
.../amazon/aws/operators/sagemaker_training.py | 2 +-
.../amazon/aws/operators/sagemaker_transform.py | 2 +-
.../amazon/aws/operators/sagemaker_tuning.py | 2 +-
.../providers/amazon/aws/sensors/sagemaker_base.py | 2 +-
airflow/providers/apache/druid/hooks/druid.py | 4 +-
airflow/providers/apache/hdfs/sensors/hdfs.py | 2 +-
airflow/providers/apache/hive/hooks/hive.py | 8 +---
.../providers/apache/hive/operators/hive_stats.py | 24 +++++-------
airflow/providers/apache/spark/hooks/spark_jdbc.py | 2 +-
.../providers/apache/spark/hooks/spark_submit.py | 8 ++--
.../providers/cncf/kubernetes/hooks/kubernetes.py | 6 +--
.../cncf/kubernetes/operators/kubernetes_pod.py | 3 +-
.../cncf/kubernetes/sensors/spark_kubernetes.py | 2 +-
airflow/providers/databricks/hooks/databricks.py | 2 +-
airflow/providers/docker/operators/docker_swarm.py | 2 +-
airflow/providers/ftp/hooks/ftp.py | 4 +-
airflow/providers/google/cloud/hooks/bigquery.py | 4 +-
airflow/providers/google/cloud/hooks/compute.py | 20 +++-------
airflow/providers/google/cloud/hooks/dataflow.py | 12 +++---
airflow/providers/google/cloud/hooks/functions.py | 4 +-
.../google/cloud/hooks/kubernetes_engine.py | 2 +-
.../providers/google/cloud/log/gcs_task_handler.py | 4 +-
.../providers/google/cloud/operators/bigquery.py | 4 +-
.../providers/google/cloud/operators/dataproc.py | 2 +-
.../jenkins/operators/jenkins_job_trigger.py | 6 +--
airflow/providers/jira/hooks/jira.py | 4 +-
airflow/providers/jira/operators/jira.py | 4 +-
.../azure/operators/azure_container_instances.py | 2 +-
.../providers/microsoft/winrm/operators/winrm.py | 2 +-
airflow/providers/mysql/hooks/mysql.py | 21 +++-------
airflow/providers/opsgenie/hooks/opsgenie_alert.py | 2 +-
airflow/providers/oracle/hooks/oracle.py | 2 +-
airflow/providers/pagerduty/hooks/pagerduty.py | 2 +-
airflow/providers/plexus/operators/job.py | 4 +-
airflow/providers/postgres/hooks/postgres.py | 2 +-
airflow/providers/qubole/hooks/qubole.py | 4 +-
airflow/providers/salesforce/hooks/salesforce.py | 2 +-
airflow/providers/sftp/operators/sftp.py | 2 +-
airflow/providers/ssh/operators/ssh.py | 2 +-
airflow/security/kerberos.py | 4 +-
airflow/security/utils.py | 2 +-
airflow/sensors/date_time.py | 2 +-
airflow/sensors/sql.py | 2 +-
airflow/utils/cli.py | 2 +-
airflow/utils/code_utils.py | 2 +-
airflow/utils/log/file_task_handler.py | 6 +--
airflow/utils/timezone.py | 2 +-
airflow/www/utils.py | 10 ++---
airflow/www/validators.py | 4 +-
airflow/www/views.py | 19 +++------
breeze-complete | 1 +
docs/exts/airflow_intersphinx.py | 2 +-
docs/exts/exampleinclude.py | 2 +-
docs/exts/redirects.py | 2 +-
metastore_browser/hive_metastore.py | 14 +++----
.../pre_commit_check_provider_yaml_files.py | 8 ++--
scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py | 6 +--
tests/api/common/experimental/test_pool.py | 2 +-
tests/cli/commands/test_connection_command.py | 14 +++----
tests/core/test_core.py | 2 +-
tests/dags/test_subdag.py | 2 +-
tests/dags_corrupted/test_impersonation_custom.py | 2 +-
tests/executors/test_celery_executor.py | 2 +-
tests/hooks/test_dbapi.py | 2 +-
tests/models/test_baseoperator.py | 4 +-
tests/models/test_connection.py | 2 +-
tests/models/test_dag.py | 4 +-
tests/models/test_dagbag.py | 6 +--
tests/models/test_renderedtifields.py | 2 +-
.../amazon/aws/hooks/test_batch_waiters.py | 2 +-
tests/providers/amazon/aws/hooks/test_s3.py | 4 +-
.../apache/hive/operators/test_hive_stats.py | 14 +++----
.../apache/hive/transfers/test_mysql_to_hive.py | 24 +++++-------
.../providers/apache/spark/hooks/test_spark_sql.py | 10 ++---
tests/providers/apache/sqoop/hooks/test_sqoop.py | 45 +++++++++-------------
.../elasticsearch/log/elasticmock/__init__.py | 2 +-
.../providers/google/cloud/hooks/test_cloud_sql.py | 4 +-
tests/providers/google/cloud/hooks/test_pubsub.py | 32 +++++++--------
.../google/cloud/operators/test_dataflow.py | 2 +-
.../google/cloud/operators/test_mlengine_utils.py | 6 +--
tests/providers/google/cloud/sensors/test_gcs.py | 2 +-
.../cloud/transfers/test_bigquery_to_bigquery.py | 2 +-
.../google/cloud/transfers/test_gcs_to_gcs.py | 2 +-
.../cloud/utils/test_mlengine_operator_utils.py | 6 +--
tests/providers/mysql/hooks/test_mysql.py | 6 +--
tests/serialization/test_dag_serialization.py | 4 +-
tests/test_utils/gcp_system_helpers.py | 2 +-
tests/test_utils/logging_command_executor.py | 2 +-
tests/test_utils/mock_operators.py | 2 +-
tests/utils/test_helpers.py | 4 +-
tests/www/api/experimental/test_endpoints.py | 2 +-
tests/www/test_views.py | 16 ++++----
127 files changed, 280 insertions(+), 361 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index aea91e3..0eb96fd 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -609,4 +609,11 @@ repos:
entry: "./scripts/ci/pre_commit/pre_commit_in_container_bats_test.sh"
files: ^tests/bats/in_container/.*.bats$|^scripts/in_container/.*sh
pass_filenames: false
+ - id: flynt
+ name: Convert to f-strings with flynt
+ entry: flynt
+ language: python
+ language_version: python3
+ additional_dependencies: ['flynt']
+ files: \.py$
## ONLY ADD PRE-COMMITS HERE THAT REQUIRE CI IMAGE
diff --git a/BREEZE.rst b/BREEZE.rst
index 131f94c..f4689ba 100644
--- a/BREEZE.rst
+++ b/BREEZE.rst
@@ -2232,7 +2232,7 @@ This is the current syntax for `./breeze <./breeze>`_:
check-executables-have-shebangs check-hooks-apply check-integrations
check-merge-conflict check-xml consistent-pylint daysago-import-check
debug-statements detect-private-key doctoc dont-use-safe-filter end-of-file-fixer
- fix-encoding-pragma flake8 forbid-tabs helm-lint identity
+ fix-encoding-pragma flake8 flynt forbid-tabs helm-lint identity
incorrect-use-of-LoggingMixin insert-license isort json-schema language-matters
lint-dockerfile lint-openapi markdownlint mermaid mixed-line-ending mypy mypy-helm
no-providers-in-core-examples no-relative-imports pre-commit-descriptions
diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst
index 55674bf..1c28dd9 100644
--- a/STATIC_CODE_CHECKS.rst
+++ b/STATIC_CODE_CHECKS.rst
@@ -102,6 +102,8 @@ require Breeze Docker images to be installed locally:
----------------------------------- ---------------------------------------------------------------- ------------
``flake8`` Runs flake8. *
----------------------------------- ---------------------------------------------------------------- ------------
+``flynt`` Runs flynt.
+----------------------------------- ---------------------------------------------------------------- ------------
``forbid-tabs`` Fails if tabs are used in the project.
----------------------------------- ---------------------------------------------------------------- ------------
``helm-lint`` Verifies if helm lint passes for the chart
diff --git a/airflow/api/common/experimental/get_code.py b/airflow/api/common/experimental/get_code.py
index 99f248b..79b0b9f 100644
--- a/airflow/api/common/experimental/get_code.py
+++ b/airflow/api/common/experimental/get_code.py
@@ -32,5 +32,5 @@ def get_code(dag_id: str) -> str:
try:
return DagCode.get_code_by_fileloc(dag.fileloc)
except (OSError, DagCodeNotFound) as exception:
- error_message = "Error {} while reading Dag id {} Code".format(str(exception), dag_id)
+ error_message = f"Error {str(exception)} while reading Dag id {dag_id} Code"
raise AirflowException(error_message, exception)
diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py
index 0f1d1c7..30950ea 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -29,7 +29,7 @@ def get_pool(name, session=None):
pool = session.query(Pool).filter_by(pool=name).first()
if pool is None:
- raise PoolNotFound("Pool '%s' doesn't exist" % name)
+ raise PoolNotFound(f"Pool '{name}' doesn't exist")
return pool
@@ -49,7 +49,7 @@ def create_pool(name, slots, description, session=None):
try:
slots = int(slots)
except ValueError:
- raise AirflowBadRequest("Bad value for `slots`: %s" % slots)
+ raise AirflowBadRequest(f"Bad value for `slots`: {slots}")
# Get the length of the pool column
pool_name_length = Pool.pool.property.columns[0].type.length
@@ -81,7 +81,7 @@ def delete_pool(name, session=None):
pool = session.query(Pool).filter_by(pool=name).first()
if pool is None:
- raise PoolNotFound("Pool '%s' doesn't exist" % name)
+ raise PoolNotFound(f"Pool '{name}' doesn't exist")
session.delete(pool)
session.commit()
diff --git a/airflow/api_connexion/endpoints/connection_endpoint.py b/airflow/api_connexion/endpoints/connection_endpoint.py
index ecee686..df3bd41 100644
--- a/airflow/api_connexion/endpoints/connection_endpoint.py
+++ b/airflow/api_connexion/endpoints/connection_endpoint.py
@@ -124,4 +124,4 @@ def post_connection(session):
session.add(connection)
session.commit()
return connection_schema.dump(connection)
- raise AlreadyExists(detail="Connection already exist. ID: %s" % conn_id)
+ raise AlreadyExists(detail=f"Connection already exist. ID: {conn_id}")
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index 40f8834..9b050ec 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -169,7 +169,7 @@ def set_is_paused(is_paused, args):
dag.set_is_paused(is_paused=is_paused)
- print("Dag: {}, paused: {}".format(args.dag_id, str(is_paused)))
+ print(f"Dag: {args.dag_id}, paused: {is_paused}")
def dag_show(args):
diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index c2794d4..fac4c26 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -406,14 +406,11 @@ def task_render(args):
for attr in task.__class__.template_fields:
print(
textwrap.dedent(
- """\
+ f""" # ----------------------------------------------------------
+ # property: {attr}
# ----------------------------------------------------------
- # property: {}
- # ----------------------------------------------------------
- {}
- """.format(
- attr, getattr(task, attr)
- )
+ {getattr(task, attr)}
+ """
)
)
diff --git a/airflow/cli/commands/user_command.py b/airflow/cli/commands/user_command.py
index 3fd80bd..1274745 100644
--- a/airflow/cli/commands/user_command.py
+++ b/airflow/cli/commands/user_command.py
@@ -98,7 +98,7 @@ def users_manage_role(args, remove=False):
appbuilder = cached_app().appbuilder # pylint: disable=no-member
user = appbuilder.sm.find_user(username=args.username) or appbuilder.sm.find_user(email=args.email)
if not user:
- raise SystemExit('User "{}" does not exist'.format(args.username or args.email))
+ raise SystemExit(f'User "{args.username or args.email}" does not exist')
role = appbuilder.sm.find_role(args.role)
if not role:
@@ -144,7 +144,7 @@ def users_export(args):
with open(args.export, 'w') as file:
file.write(json.dumps(users, sort_keys=True, indent=4))
- print("{} users successfully exported to {}".format(len(users), file.name))
+ print(f"{len(users)} users successfully exported to {file.name}")
@cli_utils.action_logging
@@ -191,7 +191,7 @@ def _import_users(users_list): # pylint: disable=redefined-outer-name
existing_user = appbuilder.sm.find_user(email=user['email'])
if existing_user:
- print("Found existing user with email '{}'".format(user['email']))
+ print(f"Found existing user with email '{user['email']}'")
existing_user.roles = roles
existing_user.first_name = user['firstname']
existing_user.last_name = user['lastname']
@@ -206,7 +206,7 @@ def _import_users(users_list): # pylint: disable=redefined-outer-name
appbuilder.sm.update_user(existing_user)
users_updated.append(user['email'])
else:
- print("Creating new user with email '{}'".format(user['email']))
+ print(f"Creating new user with email '{user['email']}'")
appbuilder.sm.add_user(
username=user['username'],
first_name=user['firstname'],
diff --git a/airflow/cli/commands/variable_command.py b/airflow/cli/commands/variable_command.py
index 55cf94b..526f094 100644
--- a/airflow/cli/commands/variable_command.py
+++ b/airflow/cli/commands/variable_command.py
@@ -92,7 +92,7 @@ def _import_helper(filepath):
try:
Variable.set(k, v, serialize_json=not isinstance(v, str))
except Exception as e: # pylint: disable=broad-except
- print('Variable import failed: {}'.format(repr(e)))
+ print(f'Variable import failed: {repr(e)}')
fail_count += 1
else:
suc_count += 1
diff --git a/airflow/configuration.py b/airflow/configuration.py
index ecd2bc6..5b765de 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -536,7 +536,7 @@ class AirflowConfigParser(ConfigParser): # pylint: disable=too-many-ancestors
# This is based on the configparser.RawConfigParser.write method code to add support for
# reading options from environment variables.
if space_around_delimiters:
- delimiter = " {} ".format(self._delimiters[0])
+ delimiter = f" {self._delimiters[0]} "
else:
delimiter = self._delimiters[0]
if self._defaults:
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index 8eaadd7..456def2 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -52,7 +52,7 @@ def my_py_command(test_mode, params):
)
)
# Print out the value of "miff", passed in below via the Python Operator
- print(" 'miff' was passed in via task params = {}".format(params["miff"]))
+ print(f" 'miff' was passed in via task params = {params['miff']}")
return 1
@@ -83,8 +83,8 @@ def print_env_vars(test_mode):
--env-vars '{"foo":"bar"}'`
"""
if test_mode:
- print("foo={}".format(os.environ.get('foo')))
- print("AIRFLOW_TEST_MODE={}".format(os.environ.get('AIRFLOW_TEST_MODE')))
+ print(f"foo={os.environ.get('foo')}")
+ print(f"AIRFLOW_TEST_MODE={os.environ.get('AIRFLOW_TEST_MODE')}")
env_var_test_task = PythonOperator(task_id='env_var_test_task', python_callable=print_env_vars, dag=dag)
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 0355275..f431dc4 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -43,7 +43,7 @@ def run_this_func(**context):
:param context: The execution context
:type context: dict
"""
- print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"]))
+ print(f"Remotely received value of {context['dag_run'].conf['message']} for key=message")
run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag)
diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py
index 6a30415..849b294 100644
--- a/airflow/example_dags/subdags/subdag.py
+++ b/airflow/example_dags/subdags/subdag.py
@@ -43,7 +43,7 @@ def subdag(parent_dag_name, child_dag_name, args):
for i in range(5):
DummyOperator(
- task_id='{}-task-{}'.format(child_dag_name, i + 1),
+ task_id=f'{child_dag_name}-task-{i + 1}',
default_args=args,
dag=dag_subdag,
)
diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py b/airflow/example_dags/tutorial_taskflow_api_etl.py
index e50ae5f..cfcfbd9 100644
--- a/airflow/example_dags/tutorial_taskflow_api_etl.py
+++ b/airflow/example_dags/tutorial_taskflow_api_etl.py
@@ -91,7 +91,7 @@ def tutorial_taskflow_api_etl():
instead of saving it to end user review, just prints it out.
"""
- print("Total order value is: %.2f" % total_order_value)
+ print(f"Total order value is: {total_order_value:.2f}")
# [END load]
diff --git a/airflow/hooks/dbapi.py b/airflow/hooks/dbapi.py
index 9e340e4..9821643 100644
--- a/airflow/hooks/dbapi.py
+++ b/airflow/hooks/dbapi.py
@@ -248,7 +248,7 @@ class DbApiHook(BaseHook):
sql = "INSERT INTO "
else:
sql = "REPLACE INTO "
- sql += "{} {} VALUES ({})".format(table, target_fields, ",".join(placeholders))
+ sql += f"{table} {target_fields} VALUES ({','.join(placeholders)})"
return sql
def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs):
diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py
index 0004cac..023cd32 100644
--- a/airflow/kubernetes/refresh_config.py
+++ b/airflow/kubernetes/refresh_config.py
@@ -61,7 +61,7 @@ class RefreshKubeConfigLoader(KubeConfigLoader):
if 'token' not in status:
logging.error('exec: missing token field in plugin output')
return None
- self.token = "Bearer %s" % status['token'] # pylint: disable=W0201
+ self.token = f"Bearer {status['token']}" # pylint: disable=W0201
ts_str = status.get('expirationTimestamp')
if ts_str:
self.api_key_expire_ts = _parse_timestamp(ts_str)
diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 1159a44..4edd6b7 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -165,7 +165,7 @@ class Connection(Base, LoggingMixin): # pylint: disable=too-many-instance-attri
def get_uri(self) -> str:
"""Return connection in URI format"""
- uri = '{}://'.format(str(self.conn_type).lower().replace('_', '-'))
+ uri = f"{str(self.conn_type).lower().replace('_', '-')}://"
authority_block = ''
if self.login is not None:
@@ -190,12 +190,12 @@ class Connection(Base, LoggingMixin): # pylint: disable=too-many-instance-attri
host_block += f'@:{self.port}'
if self.schema:
- host_block += '/{}'.format(quote(self.schema, safe=''))
+ host_block += f"/{quote(self.schema, safe='')}"
uri += host_block
if self.extra_dejson:
- uri += '?{}'.format(urlencode(self.extra_dejson))
+ uri += f'?{urlencode(self.extra_dejson)}'
return uri
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 8bb32db..15332f3 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1538,7 +1538,7 @@ class DAG(LoggingMixin):
dttm = timezone.utcnow()
pickled = pickle.dumps(self)
d['pickle_len'] = len(pickled)
- d['pickling_duration'] = "{}".format(timezone.utcnow() - dttm)
+ d['pickling_duration'] = str(timezone.utcnow() - dttm)
except Exception as e:
self.log.debug(e)
d['is_picklable'] = False
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index d671a01..3c9f53a 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1595,9 +1595,7 @@ class TaskInstance(Base, LoggingMixin): # pylint: disable=R0902,R0904
yesterday_ds_nodash = yesterday_ds.replace('-', '')
tomorrow_ds_nodash = tomorrow_ds.replace('-', '')
- ti_key_str = "{dag_id}__{task_id}__{ds_nodash}".format(
- dag_id=task.dag_id, task_id=task.task_id, ds_nodash=ds_nodash
- )
+ ti_key_str = f"{task.dag_id}__{task.task_id}__{ds_nodash}"
if task.params:
params.update(task.params)
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 6428a40..844e38f 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -71,9 +71,7 @@ class BaseXCom(Base, LoggingMixin):
self.value = pickle.loads(self.value)
def __repr__(self):
- return '<XCom "{key}" ({task_id} @ {execution_date})>'.format(
- key=self.key, task_id=self.task_id, execution_date=self.execution_date
- )
+ return f'<XCom "{self.key}" ({self.task_id} @ {self.execution_date})>'
@classmethod
@provide_session
diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index 59c2e60..00f7e13 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -293,9 +293,7 @@ class SQLIntervalCheckOperator(BaseOperator):
self.days_back = -abs(days_back)
self.conn_id = conn_id
sqlexp = ", ".join(self.metrics_sorted)
- sqlt = "SELECT {sqlexp} FROM {table} WHERE {date_filter_column}=".format(
- sqlexp=sqlexp, table=table, date_filter_column=date_filter_column
- )
+ sqlt = f"SELECT {sqlexp} FROM {table} WHERE {date_filter_column}="
self.sql1 = sqlt + "'{{ ds }}'"
self.sql2 = sqlt + "'{{ macros.ds_add(ds, " + str(self.days_back) + ") }}'"
@@ -360,9 +358,7 @@ class SQLIntervalCheckOperator(BaseOperator):
ratios[k],
self.metrics_thresholds[k],
)
- raise AirflowException(
- "The following tests have failed:\n {}".format(", ".join(sorted(failed_tests)))
- )
+ raise AirflowException(f"The following tests have failed:\n {', '.join(sorted(failed_tests))}")
self.log.info("All tests have passed")
@@ -535,7 +531,7 @@ class BranchSQLOperator(BaseOperator, SkipMixin):
self._hook = self._get_hook()
if self._hook is None:
- raise AirflowException("Failed to establish connection to '%s'" % self.conn_id)
+ raise AirflowException(f"Failed to establish connection to '{self.conn_id}'")
if self.sql is None:
raise AirflowException("Expected 'sql' parameter is missing.")
@@ -584,14 +580,14 @@ class BranchSQLOperator(BaseOperator, SkipMixin):
follow_branch = self.follow_task_ids_if_true
else:
raise AirflowException(
- "Unexpected query return result '{}' type '{}'".format(query_result, type(query_result))
+ f"Unexpected query return result '{query_result}' type '{type(query_result)}'"
)
if follow_branch is None:
follow_branch = self.follow_task_ids_if_false
except ValueError:
raise AirflowException(
- "Unexpected query return result '{}' type '{}'".format(query_result, type(query_result))
+ f"Unexpected query return result '{query_result}' type '{type(query_result)}'"
)
self.skip_all_except(context["ti"], follow_branch)
diff --git a/airflow/providers/amazon/aws/hooks/datasync.py b/airflow/providers/amazon/aws/hooks/datasync.py
index 4925529..3c82749 100644
--- a/airflow/providers/amazon/aws/hooks/datasync.py
+++ b/airflow/providers/amazon/aws/hooks/datasync.py
@@ -57,7 +57,7 @@ class AWSDataSyncHook(AwsBaseHook):
self.tasks: list = []
# wait_interval_seconds = 0 is used during unit tests
if wait_interval_seconds < 0 or wait_interval_seconds > 15 * 60:
- raise ValueError("Invalid wait_interval_seconds %s" % wait_interval_seconds)
+ raise ValueError(f"Invalid wait_interval_seconds {wait_interval_seconds}")
self.wait_interval_seconds = wait_interval_seconds
def create_location(self, location_uri: str, **create_location_kwargs) -> str:
@@ -314,4 +314,4 @@ class AWSDataSyncHook(AwsBaseHook):
return False
if iterations <= 0:
raise AirflowTaskTimeout("Max iterations exceeded!")
- raise AirflowException("Unknown status: %s" % status) # Should never happen
+ raise AirflowException(f"Unknown status: {status}") # Should never happen
diff --git a/airflow/providers/amazon/aws/hooks/dynamodb.py b/airflow/providers/amazon/aws/hooks/dynamodb.py
index a829f8d..a66f2b0 100644
--- a/airflow/providers/amazon/aws/hooks/dynamodb.py
+++ b/airflow/providers/amazon/aws/hooks/dynamodb.py
@@ -58,6 +58,4 @@ class AwsDynamoDBHook(AwsBaseHook):
batch.put_item(Item=item)
return True
except Exception as general_error:
- raise AirflowException(
- "Failed to insert items in dynamodb, error: {error}".format(error=str(general_error))
- )
+ raise AirflowException(f"Failed to insert items in dynamodb, error: {str(general_error)}")
diff --git a/airflow/providers/amazon/aws/hooks/sagemaker.py b/airflow/providers/amazon/aws/hooks/sagemaker.py
index ab5fdd1..d6548ad 100644
--- a/airflow/providers/amazon/aws/hooks/sagemaker.py
+++ b/airflow/providers/amazon/aws/hooks/sagemaker.py
@@ -126,7 +126,7 @@ def secondary_training_status_message(
for transition in transitions_to_print:
message = transition['StatusMessage']
time_str = timezone.convert_to_utc(job_description['LastModifiedTime']).strftime('%Y-%m-%d %H:%M:%S')
- status_strs.append('{} {} - {}'.format(time_str, transition['Status'], message))
+ status_strs.append(f"{time_str} {transition['Status']} - {message}")
return '\n'.join(status_strs)
@@ -740,7 +740,7 @@ class SageMakerHook(AwsBaseHook): # pylint: disable=too-many-public-methods
if status in non_terminal_states:
running = True
elif status in self.failed_states:
- raise AirflowException('SageMaker job failed because %s' % response['FailureReason'])
+ raise AirflowException(f"SageMaker job failed because {response['FailureReason']}")
else:
running = False
diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py
index d6e5326..7fdeac3 100644
--- a/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -118,7 +118,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
log_exists = self.s3_log_exists(remote_loc)
except Exception as error: # pylint: disable=broad-except
self.log.exception(error)
- log = '*** Failed to verify remote log exists {}.\n{}\n'.format(remote_loc, str(error))
+ log = f'*** Failed to verify remote log exists {remote_loc}.\n{str(error)}\n'
if log_exists:
# If S3 remote file exists, we do not fetch logs from task instance
diff --git a/airflow/providers/amazon/aws/operators/datasync.py b/airflow/providers/amazon/aws/operators/datasync.py
index fab6898..f5c129a 100644
--- a/airflow/providers/amazon/aws/operators/datasync.py
+++ b/airflow/providers/amazon/aws/operators/datasync.py
@@ -351,7 +351,7 @@ class AWSDataSyncOperator(BaseOperator):
self.log.log(level, '%s=%s', k, v)
if not result:
- raise AirflowException("Failed TaskExecutionArn %s" % self.task_execution_arn)
+ raise AirflowException(f"Failed TaskExecutionArn {self.task_execution_arn}")
def on_kill(self) -> None:
"""Cancel the submitted DataSync task."""
diff --git a/airflow/providers/amazon/aws/operators/emr_add_steps.py b/airflow/providers/amazon/aws/operators/emr_add_steps.py
index 44bc20c..2ffd5cc 100644
--- a/airflow/providers/amazon/aws/operators/emr_add_steps.py
+++ b/airflow/providers/amazon/aws/operators/emr_add_steps.py
@@ -100,7 +100,7 @@ class EmrAddStepsOperator(BaseOperator):
response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=steps)
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
- raise AirflowException('Adding steps failed: %s' % response)
+ raise AirflowException(f'Adding steps failed: {response}')
else:
self.log.info('Steps %s added to JobFlow', response['StepIds'])
return response['StepIds']
diff --git a/airflow/providers/amazon/aws/operators/emr_create_job_flow.py b/airflow/providers/amazon/aws/operators/emr_create_job_flow.py
index b3b6808..d8dc31e 100644
--- a/airflow/providers/amazon/aws/operators/emr_create_job_flow.py
+++ b/airflow/providers/amazon/aws/operators/emr_create_job_flow.py
@@ -78,7 +78,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
response = emr.create_job_flow(job_flow_overrides)
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
- raise AirflowException('JobFlow creation failed: %s' % response)
+ raise AirflowException(f'JobFlow creation failed: {response}')
else:
self.log.info('JobFlow with id %s created', response['JobFlowId'])
return response['JobFlowId']
diff --git a/airflow/providers/amazon/aws/operators/emr_modify_cluster.py b/airflow/providers/amazon/aws/operators/emr_modify_cluster.py
index f0e4693..a04e845 100644
--- a/airflow/providers/amazon/aws/operators/emr_modify_cluster.py
+++ b/airflow/providers/amazon/aws/operators/emr_modify_cluster.py
@@ -66,7 +66,7 @@ class EmrModifyClusterOperator(BaseOperator):
)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
- raise AirflowException('Modify cluster failed: %s' % response)
+ raise AirflowException(f'Modify cluster failed: {response}')
else:
self.log.info('Steps concurrency level %d', response['StepConcurrencyLevel'])
return response['StepConcurrencyLevel']
diff --git a/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py b/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py
index 0e7e17f..9d75eaf 100644
--- a/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py
+++ b/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py
@@ -51,6 +51,6 @@ class EmrTerminateJobFlowOperator(BaseOperator):
response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
- raise AirflowException('JobFlow termination failed: %s' % response)
+ raise AirflowException(f'JobFlow termination failed: {response}')
else:
self.log.info('JobFlow with id %s terminated', self.job_flow_id)
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py b/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py
index 53cfd93..35b0b11 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py
@@ -150,7 +150,7 @@ class SageMakerEndpointOperator(SageMakerBaseOperator):
)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
- raise AirflowException('Sagemaker endpoint creation failed: %s' % response)
+ raise AirflowException(f'Sagemaker endpoint creation failed: {response}')
else:
return {
'EndpointConfig': self.hook.describe_endpoint_config(endpoint_info['EndpointConfigName']),
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py b/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py
index bbf2be1..a2add7b 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py
@@ -49,6 +49,6 @@ class SageMakerEndpointConfigOperator(SageMakerBaseOperator):
self.log.info('Creating SageMaker Endpoint Config %s.', self.config['EndpointConfigName'])
response = self.hook.create_endpoint_config(self.config)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
- raise AirflowException('Sagemaker endpoint config creation failed: %s' % response)
+ raise AirflowException(f'Sagemaker endpoint config creation failed: {response}')
else:
return {'EndpointConfig': self.hook.describe_endpoint_config(self.config['EndpointConfigName'])}
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_model.py b/airflow/providers/amazon/aws/operators/sagemaker_model.py
index 25730ea..0e8cbf4 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_model.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_model.py
@@ -53,6 +53,6 @@ class SageMakerModelOperator(SageMakerBaseOperator):
self.log.info('Creating SageMaker Model %s.', self.config['ModelName'])
response = self.hook.create_model(self.config)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
- raise AirflowException('Sagemaker model creation failed: %s' % response)
+ raise AirflowException(f'Sagemaker model creation failed: {response}')
else:
return {'Model': self.hook.describe_model(self.config['ModelName'])}
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_processing.py b/airflow/providers/amazon/aws/operators/sagemaker_processing.py
index e56a987..271b46b 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_processing.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_processing.py
@@ -119,5 +119,5 @@ class SageMakerProcessingOperator(SageMakerBaseOperator):
max_ingestion_time=self.max_ingestion_time,
)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
- raise AirflowException('Sagemaker Processing Job creation failed: %s' % response)
+ raise AirflowException(f'Sagemaker Processing Job creation failed: {response}')
return {'Processing': self.hook.describe_processing_job(self.config['ProcessingJobName'])}
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_training.py b/airflow/providers/amazon/aws/operators/sagemaker_training.py
index 29c34f6..7d9eaf2 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_training.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_training.py
@@ -117,6 +117,6 @@ class SageMakerTrainingOperator(SageMakerBaseOperator):
max_ingestion_time=self.max_ingestion_time,
)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
- raise AirflowException('Sagemaker Training Job creation failed: %s' % response)
+ raise AirflowException(f'Sagemaker Training Job creation failed: {response}')
else:
return {'Training': self.hook.describe_training_job(self.config['TrainingJobName'])}
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_transform.py b/airflow/providers/amazon/aws/operators/sagemaker_transform.py
index 7caf9f1..b264d2d 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_transform.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_transform.py
@@ -116,7 +116,7 @@ class SageMakerTransformOperator(SageMakerBaseOperator):
max_ingestion_time=self.max_ingestion_time,
)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
- raise AirflowException('Sagemaker transform Job creation failed: %s' % response)
+ raise AirflowException(f'Sagemaker transform Job creation failed: {response}')
else:
return {
'Model': self.hook.describe_model(transform_config['ModelName']),
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_tuning.py b/airflow/providers/amazon/aws/operators/sagemaker_tuning.py
index f8df36a..38664a8 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_tuning.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_tuning.py
@@ -92,6 +92,6 @@ class SageMakerTuningOperator(SageMakerBaseOperator):
max_ingestion_time=self.max_ingestion_time,
)
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
- raise AirflowException('Sagemaker Tuning Job creation failed: %s' % response)
+ raise AirflowException(f'Sagemaker Tuning Job creation failed: {response}')
else:
return {'Tuning': self.hook.describe_tuning_job(self.config['HyperParameterTuningJobName'])}
diff --git a/airflow/providers/amazon/aws/sensors/sagemaker_base.py b/airflow/providers/amazon/aws/sensors/sagemaker_base.py
index 16c8cd7..6572122 100644
--- a/airflow/providers/amazon/aws/sensors/sagemaker_base.py
+++ b/airflow/providers/amazon/aws/sensors/sagemaker_base.py
@@ -63,7 +63,7 @@ class SageMakerBaseSensor(BaseSensorOperator):
if state in self.failed_states():
failed_reason = self.get_failed_reason_from_response(response)
- raise AirflowException('Sagemaker job failed for the following reason: %s' % failed_reason)
+ raise AirflowException(f'Sagemaker job failed for the following reason: {failed_reason}')
return True
def non_terminal_states(self) -> Set[str]:
diff --git a/airflow/providers/apache/druid/hooks/druid.py b/airflow/providers/apache/druid/hooks/druid.py
index b4af207..69d33a1 100644
--- a/airflow/providers/apache/druid/hooks/druid.py
+++ b/airflow/providers/apache/druid/hooks/druid.py
@@ -68,9 +68,7 @@ class DruidHook(BaseHook):
port = conn.port
conn_type = 'http' if not conn.conn_type else conn.conn_type
endpoint = conn.extra_dejson.get('endpoint', '')
- return "{conn_type}://{host}:{port}/{endpoint}".format(
- conn_type=conn_type, host=host, port=port, endpoint=endpoint
- )
+ return f"{conn_type}://{host}:{port}/{endpoint}"
def get_auth(self) -> Optional[requests.auth.HTTPBasicAuth]:
"""
diff --git a/airflow/providers/apache/hdfs/sensors/hdfs.py b/airflow/providers/apache/hdfs/sensors/hdfs.py
index 65cfb5b..867c193 100644
--- a/airflow/providers/apache/hdfs/sensors/hdfs.py
+++ b/airflow/providers/apache/hdfs/sensors/hdfs.py
@@ -141,7 +141,7 @@ class HdfsRegexSensor(HdfsSensor):
result = [
f
for f in sb_client.ls([self.filepath], include_toplevel=False)
- if f['file_type'] == 'f' and self.regex.match(f['path'].replace('%s/' % self.filepath, ''))
+ if f['file_type'] == 'f' and self.regex.match(f['path'].replace(f'{self.filepath}/', ''))
]
result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
result = self.filter_for_filesize(result, self.file_size)
diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py
index ab7b7b7..41f5cff 100644
--- a/airflow/providers/apache/hive/hooks/hive.py
+++ b/airflow/providers/apache/hive/hooks/hive.py
@@ -133,9 +133,7 @@ class HiveCliHook(BaseHook):
if self.use_beeline:
hive_bin = 'beeline'
- jdbc_url = "jdbc:hive2://{host}:{port}/{schema}".format(
- host=conn.host, port=conn.port, schema=conn.schema
- )
+ jdbc_url = f"jdbc:hive2://{conn.host}:{conn.port}/{conn.schema}"
if conf.get('core', 'security') == 'kerberos':
template = conn.extra_dejson.get('principal', "hive/_HOST@EXAMPLE.COM")
if "_HOST" in template:
@@ -143,9 +141,7 @@ class HiveCliHook(BaseHook):
proxy_user = self._get_proxy_user()
- jdbc_url += ";principal={template};{proxy_user}".format(
- template=template, proxy_user=proxy_user
- )
+ jdbc_url += f";principal={template};{proxy_user}"
elif self.auth:
jdbc_url += ";auth=" + self.auth
diff --git a/airflow/providers/apache/hive/operators/hive_stats.py b/airflow/providers/apache/hive/operators/hive_stats.py
index d4de591..88faa40 100644
--- a/airflow/providers/apache/hive/operators/hive_stats.py
+++ b/airflow/providers/apache/hive/operators/hive_stats.py
@@ -135,9 +135,7 @@ class HiveStatsCollectionOperator(BaseOperator):
where_clause_ = [f"{k} = '{v}'" for k, v in self.partition.items()]
where_clause = " AND\n ".join(where_clause_)
- sql = "SELECT {exprs_str} FROM {table} WHERE {where_clause};".format(
- exprs_str=exprs_str, table=self.table, where_clause=where_clause
- )
+ sql = f"SELECT {exprs_str} FROM {self.table} WHERE {where_clause};"
presto = PrestoHook(presto_conn_id=self.presto_conn_id)
self.log.info('Executing SQL check: %s', sql)
@@ -150,26 +148,22 @@ class HiveStatsCollectionOperator(BaseOperator):
self.log.info("Deleting rows from previous runs if they exist")
mysql = MySqlHook(self.mysql_conn_id)
- sql = """
+ sql = f"""
SELECT 1 FROM hive_stats
WHERE
- table_name='{table}' AND
+ table_name='{self.table}' AND
partition_repr='{part_json}' AND
- dttm='{dttm}'
+ dttm='{self.dttm}'
LIMIT 1;
- """.format(
- table=self.table, part_json=part_json, dttm=self.dttm
- )
+ """
if mysql.get_records(sql):
- sql = """
+ sql = f"""
DELETE FROM hive_stats
WHERE
- table_name='{table}' AND
+ table_name='{self.table}' AND
partition_repr='{part_json}' AND
- dttm='{dttm}';
- """.format(
- table=self.table, part_json=part_json, dttm=self.dttm
- )
+ dttm='{self.dttm}';
+ """
mysql.run(sql)
self.log.info("Pivoting and loading cells into the Airflow db")
diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py b/airflow/providers/apache/spark/hooks/spark_jdbc.py
index 7a22c66..f90cea2 100644
--- a/airflow/providers/apache/spark/hooks/spark_jdbc.py
+++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py
@@ -207,7 +207,7 @@ class SparkJDBCHook(SparkSubmitHook):
if self._jdbc_connection['url']:
arguments += [
'-url',
- "{}{}/{}".format(jdbc_conn['conn_prefix'], jdbc_conn['url'], jdbc_conn['schema']),
+ f"{jdbc_conn['conn_prefix']}{jdbc_conn['url']}/{jdbc_conn['schema']}",
]
if self._jdbc_connection['user']:
arguments += ['-user', self._jdbc_connection['user']]
diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py
index ac1a83a..e7f2186 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -291,7 +291,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
connection_cmd += ["--master", self._connection['master']]
for key in self._conf:
- connection_cmd += ["--conf", "{}={}".format(key, str(self._conf[key]))]
+ connection_cmd += ["--conf", f"{key}={str(self._conf[key])}"]
if self._env_vars and (self._is_kubernetes or self._is_yarn):
if self._is_yarn:
tmpl = "spark.yarn.appMasterEnv.{}={}"
@@ -308,7 +308,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
if self._is_kubernetes and self._connection['namespace']:
connection_cmd += [
"--conf",
- "spark.kubernetes.namespace={}".format(self._connection['namespace']),
+ f"spark.kubernetes.namespace={self._connection['namespace']}",
]
if self._files:
connection_cmd += ["--files", self._files]
@@ -378,9 +378,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
"/usr/bin/curl",
"--max-time",
str(curl_max_wait_time),
- "{host}/v1/submissions/status/{submission_id}".format(
- host=spark_host, submission_id=self._driver_id
- ),
+ f"{spark_host}/v1/submissions/status/{self._driver_id}",
]
self.log.info(connection_cmd)
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index cf27713..ca82918 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -29,7 +29,7 @@ def _load_body_to_dict(body):
try:
body_dict = yaml.safe_load(body)
except yaml.YAMLError as e:
- raise AirflowException("Exception when loading resource definition: %s\n" % e)
+ raise AirflowException(f"Exception when loading resource definition: {e}\n")
return body_dict
@@ -169,7 +169,7 @@ class KubernetesHook(BaseHook):
self.log.debug("Response: %s", response)
return response
except client.rest.ApiException as e:
- raise AirflowException("Exception when calling -> create_custom_object: %s\n" % e)
+ raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n")
def get_custom_object(
self, group: str, version: str, plural: str, name: str, namespace: Optional[str] = None
@@ -197,7 +197,7 @@ class KubernetesHook(BaseHook):
)
return response
except client.rest.ApiException as e:
- raise AirflowException("Exception when calling -> get_custom_object: %s\n" % e)
+ raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n")
def get_namespace(self) -> str:
"""Returns the namespace that defined in the connection"""
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 3f42ab1..7b4022e 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -324,8 +324,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance-
if len(pod_list.items) > 1 and self.reattach_on_restart:
raise AirflowException(
- 'More than one pod running with labels: '
- '{label_selector}'.format(label_selector=label_selector)
+ f'More than one pod running with labels: {label_selector}'
)
launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)
diff --git a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
index 2fa3401..eb555f1 100644
--- a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
@@ -106,7 +106,7 @@ class SparkKubernetesSensor(BaseSensorOperator):
if self.attach_log and application_state in self.FAILURE_STATES + self.SUCCESS_STATES:
self._log_driver(application_state, response)
if application_state in self.FAILURE_STATES:
- raise AirflowException("Spark application failed with state: %s" % application_state)
+ raise AirflowException(f"Spark application failed with state: {application_state}")
elif application_state in self.SUCCESS_STATES:
self.log.info("Spark application ended successfully")
return True
diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py
index cc0fbc6..28953b9 100644
--- a/airflow/providers/databricks/hooks/databricks.py
+++ b/airflow/providers/databricks/hooks/databricks.py
@@ -178,7 +178,7 @@ class DatabricksHook(BaseHook): # noqa
auth = (self.databricks_conn.login, self.databricks_conn.password)
host = self.databricks_conn.host
- url = 'https://{host}/{endpoint}'.format(host=self._parse_host(host), endpoint=endpoint)
+ url = f'https://{self._parse_host(host)}/{endpoint}'
if method == 'GET':
request_func = requests.get
diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py
index 18ecf9c..1098d98 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -126,7 +126,7 @@ class DockerSwarmOperator(DockerOperator):
restart_policy=types.RestartPolicy(condition='none'),
resources=types.Resources(mem_limit=self.mem_limit),
),
- name='airflow-%s' % get_random_string(),
+ name=f'airflow-{get_random_string()}',
labels={'name': f'airflow__{self.dag_id}__{self.task_id}'},
)
diff --git a/airflow/providers/ftp/hooks/ftp.py b/airflow/providers/ftp/hooks/ftp.py
index 66a9080..6c3c064 100644
--- a/airflow/providers/ftp/hooks/ftp.py
+++ b/airflow/providers/ftp/hooks/ftp.py
@@ -180,7 +180,7 @@ class FTPHook(BaseHook):
remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
self.log.info('Retrieving file from FTP: %s', remote_full_path)
- conn.retrbinary('RETR %s' % remote_file_name, callback)
+ conn.retrbinary(f'RETR {remote_file_name}', callback)
self.log.info('Finished retrieving file from FTP: %s', remote_full_path)
if is_path and output_handle:
@@ -210,7 +210,7 @@ class FTPHook(BaseHook):
input_handle = local_full_path_or_buffer
remote_path, remote_file_name = os.path.split(remote_full_path)
conn.cwd(remote_path)
- conn.storbinary('STOR %s' % remote_file_name, input_handle)
+ conn.storbinary(f'STOR {remote_file_name}', input_handle)
if is_path:
input_handle.close()
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py
index b7f46d9..0b3c3b3 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -2843,7 +2843,7 @@ def _split_tablename(
cmpt = rest.split('.')
if len(cmpt) == 3:
if project_id:
- raise ValueError("{var}Use either : or . to specify project".format(var=var_print(var_name)))
+ raise ValueError(f"{var_print(var_name)}Use either : or . to specify project")
project_id = cmpt[0]
dataset_id = cmpt[1]
table_id = cmpt[2]
@@ -2887,7 +2887,7 @@ def _cleanse_time_partitioning(
def _validate_value(key: Any, value: Any, expected_type: Type) -> None:
"""Function to check expected type and raise error if type is not correct"""
if not isinstance(value, expected_type):
- raise TypeError("{} argument must have a type {} not {}".format(key, expected_type, type(value)))
+ raise TypeError(f"{key} argument must have a type {expected_type} not {type(value)}")
def _api_resource_configs_duplication_check(
diff --git a/airflow/providers/google/cloud/hooks/compute.py b/airflow/providers/google/cloud/hooks/compute.py
index ab84241..c4da00a 100644
--- a/airflow/providers/google/cloud/hooks/compute.py
+++ b/airflow/providers/google/cloud/hooks/compute.py
@@ -99,9 +99,7 @@ class ComputeEngineHook(GoogleBaseHook):
try:
operation_name = response["name"]
except KeyError:
- raise AirflowException(
- "Wrong response '{}' returned - it should contain " "'name' field".format(response)
- )
+ raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")
self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone)
@GoogleBaseHook.fallback_to_default_project_id
@@ -130,9 +128,7 @@ class ComputeEngineHook(GoogleBaseHook):
try:
operation_name = response["name"]
except KeyError:
- raise AirflowException(
- "Wrong response '{}' returned - it should contain " "'name' field".format(response)
- )
+ raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")
self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone)
@GoogleBaseHook.fallback_to_default_project_id
@@ -159,9 +155,7 @@ class ComputeEngineHook(GoogleBaseHook):
try:
operation_name = response["name"]
except KeyError:
- raise AirflowException(
- "Wrong response '{}' returned - it should contain " "'name' field".format(response)
- )
+ raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")
self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone)
def _execute_set_machine_type(self, zone: str, resource_id: str, body: dict, project_id: str) -> dict:
@@ -233,9 +227,7 @@ class ComputeEngineHook(GoogleBaseHook):
try:
operation_name = response["name"]
except KeyError:
- raise AirflowException(
- "Wrong response '{}' returned - it should contain " "'name' field".format(response)
- )
+ raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")
self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name)
@GoogleBaseHook.fallback_to_default_project_id
@@ -318,9 +310,7 @@ class ComputeEngineHook(GoogleBaseHook):
try:
operation_name = response["name"]
except KeyError:
- raise AirflowException(
- "Wrong response '{}' returned - it should contain " "'name' field".format(response)
- )
+ raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")
self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone)
def _wait_for_operation_to_complete(
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py
index 0a665d4..da3e49c 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -385,21 +385,19 @@ class _DataflowJobsController(LoggingMixin):
if job['currentState'] == DataflowJobStatus.JOB_STATE_DONE:
return True
elif job['currentState'] == DataflowJobStatus.JOB_STATE_FAILED:
- raise Exception("Google Cloud Dataflow job {} has failed.".format(job['name']))
+ raise Exception(f"Google Cloud Dataflow job {job['name']} has failed.")
elif job['currentState'] == DataflowJobStatus.JOB_STATE_CANCELLED:
- raise Exception("Google Cloud Dataflow job {} was cancelled.".format(job['name']))
+ raise Exception(f"Google Cloud Dataflow job {job['name']} was cancelled.")
elif job['currentState'] == DataflowJobStatus.JOB_STATE_DRAINED:
- raise Exception("Google Cloud Dataflow job {} was drained.".format(job['name']))
+ raise Exception(f"Google Cloud Dataflow job {job['name']} was drained.")
elif job['currentState'] == DataflowJobStatus.JOB_STATE_UPDATED:
- raise Exception("Google Cloud Dataflow job {} was updated.".format(job['name']))
+ raise Exception(f"Google Cloud Dataflow job {job['name']} was updated.")
elif job['currentState'] == DataflowJobStatus.JOB_STATE_RUNNING and wait_for_running:
return True
elif job['currentState'] in DataflowJobStatus.AWAITING_STATES:
return self._wait_until_finished is False
self.log.debug("Current job: %s", str(job))
- raise Exception(
- "Google Cloud Dataflow job {} was unknown state: {}".format(job["name"], job["currentState"])
- )
+ raise Exception(f"Google Cloud Dataflow job {job['name']} was unknown state: {job['currentState']}")
def wait_for_done(self) -> None:
"""Helper method to wait for result of submitted job."""
diff --git a/airflow/providers/google/cloud/hooks/functions.py b/airflow/providers/google/cloud/hooks/functions.py
index 8cb93df..49a6e2c 100644
--- a/airflow/providers/google/cloud/hooks/functions.py
+++ b/airflow/providers/google/cloud/hooks/functions.py
@@ -216,9 +216,7 @@ class CloudFunctionsHook(GoogleBaseHook):
:type project_id: str
:return: None
"""
- name = "projects/{project_id}/locations/{location}/functions/{function_id}".format(
- project_id=project_id, location=location, function_id=function_id
- )
+ name = f"projects/{project_id}/locations/{location}/functions/{function_id}"
# fmt: off
response = self.get_conn().projects().locations().functions().call( # pylint: disable=no-member
name=name,
diff --git a/airflow/providers/google/cloud/hooks/kubernetes_engine.py b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
index 930c1cd..604815d 100644
--- a/airflow/providers/google/cloud/hooks/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
@@ -99,7 +99,7 @@ class GKEHook(GoogleBaseHook):
if operation.status == Operation.Status.RUNNING or operation.status == Operation.Status.PENDING:
time.sleep(OPERATIONAL_POLL_INTERVAL)
else:
- raise exceptions.GoogleCloudError("Operation has failed with status: %s" % operation.status)
+ raise exceptions.GoogleCloudError(f"Operation has failed with status: {operation.status}")
# To update status of operation
operation = self.get_operation(operation.name, project_id=project_id or self.project_id)
return operation
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py
index 9fd456d..b57a18a 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -156,7 +156,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n'
return log, {'end_of_log': True}
except Exception as e: # pylint: disable=broad-except
- log = '*** Unable to read remote log from {}\n*** {}\n\n'.format(remote_loc, str(e))
+ log = f'*** Unable to read remote log from {remote_loc}\n*** {str(e)}\n\n'
self.log.error(log)
local_log, metadata = super()._read(ti, try_number)
log += local_log
@@ -178,7 +178,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
log = '\n'.join([old_log, log]) if old_log else log
except Exception as e: # pylint: disable=broad-except
if not hasattr(e, 'resp') or e.resp.get('status') != '404': # pylint: disable=no-member
- log = '*** Previous log discarded: {}\n\n'.format(str(e)) + log
+ log = f'*** Previous log discarded: {str(e)}\n\n' + log
self.log.info("Previous log discarded: %s", e)
try:
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index ea2c9d3..28956ec 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -729,9 +729,7 @@ class BigQueryExecuteQueryOperator(BaseOperator):
for s in self.sql
]
else:
- raise AirflowException(
- "argument 'sql' of type {} is neither a string nor an iterable".format(type(str))
- )
+ raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable")
context['task_instance'].xcom_push(key='job_id', value=job_id)
def on_kill(self) -> None:
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index ac93915..3bcb0aa 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1455,7 +1455,7 @@ class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):
@staticmethod
def _generate_temp_filename(filename):
date = time.strftime('%Y%m%d%H%M%S')
- return "{}_{}_{}".format(date, str(uuid.uuid4())[:8], ntpath.basename(filename))
+ return f"{date}_{str(uuid.uuid4())[:8]}_{ntpath.basename(filename)}"
def _upload_file_temp(self, bucket, local_file):
"""Upload a local file to a Google Cloud Storage bucket."""
diff --git a/airflow/providers/jenkins/operators/jenkins_job_trigger.py b/airflow/providers/jenkins/operators/jenkins_job_trigger.py
index 69d0f63..91b8ca9 100644
--- a/airflow/providers/jenkins/operators/jenkins_job_trigger.py
+++ b/airflow/providers/jenkins/operators/jenkins_job_trigger.py
@@ -54,7 +54,7 @@ def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> Optio
response_headers = response.headers
if response_body is None:
raise jenkins.EmptyResponseException(
- "Error communicating with server[%s]: empty response" % jenkins_server.server
+ f"Error communicating with server[{jenkins_server.server}]: empty response"
)
return {'body': response_body.decode('utf-8'), 'headers': response_headers}
except HTTPError as e:
@@ -66,9 +66,9 @@ def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> Optio
else:
raise
except socket.timeout as e:
- raise jenkins.TimeoutException('Error in request: %s' % e)
+ raise jenkins.TimeoutException(f'Error in request: {e}')
except URLError as e:
- raise JenkinsException('Error in request: %s' % e.reason)
+ raise JenkinsException(f'Error in request: {e.reason}')
return None
diff --git a/airflow/providers/jira/hooks/jira.py b/airflow/providers/jira/hooks/jira.py
index 8951c12..5d186b8 100644
--- a/airflow/providers/jira/hooks/jira.py
+++ b/airflow/providers/jira/hooks/jira.py
@@ -82,8 +82,8 @@ class JiraHook(BaseHook):
proxies=self.proxies,
)
except JIRAError as jira_error:
- raise AirflowException('Failed to create jira client, jira error: %s' % str(jira_error))
+ raise AirflowException(f'Failed to create jira client, jira error: {str(jira_error)}')
except Exception as e:
- raise AirflowException('Failed to create jira client, error: %s' % str(e))
+ raise AirflowException(f'Failed to create jira client, error: {str(e)}')
return self.client
diff --git a/airflow/providers/jira/operators/jira.py b/airflow/providers/jira/operators/jira.py
index 3550d1f..1e9530c 100644
--- a/airflow/providers/jira/operators/jira.py
+++ b/airflow/providers/jira/operators/jira.py
@@ -89,6 +89,6 @@ class JiraOperator(BaseOperator):
return jira_result
except JIRAError as jira_error:
- raise AirflowException("Failed to execute jiraOperator, error: %s" % str(jira_error))
+ raise AirflowException(f"Failed to execute jiraOperator, error: {str(jira_error)}")
except Exception as e:
- raise AirflowException("Jira operator error: %s" % str(e))
+ raise AirflowException(f"Jira operator error: {str(e)}")
diff --git a/airflow/providers/microsoft/azure/operators/azure_container_instances.py b/airflow/providers/microsoft/azure/operators/azure_container_instances.py
index 74418ac..70355ed 100644
--- a/airflow/providers/microsoft/azure/operators/azure_container_instances.py
+++ b/airflow/providers/microsoft/azure/operators/azure_container_instances.py
@@ -278,7 +278,7 @@ class AzureContainerInstancesOperator(BaseOperator):
self.log.info("Container had exit code: %s", exit_code)
if exit_code != 0:
- raise AirflowException("Container had a non-zero exit code, %s" % exit_code)
+ raise AirflowException(f"Container had a non-zero exit code, {exit_code}")
return exit_code
except CloudError:
diff --git a/airflow/providers/microsoft/winrm/operators/winrm.py b/airflow/providers/microsoft/winrm/operators/winrm.py
index 5500e3d..55d65cf 100644
--- a/airflow/providers/microsoft/winrm/operators/winrm.py
+++ b/airflow/providers/microsoft/winrm/operators/winrm.py
@@ -129,7 +129,7 @@ class WinRMOperator(BaseOperator):
self.winrm_hook.winrm_protocol.close_shell(winrm_client) # type: ignore[attr-defined]
except Exception as e:
- raise AirflowException("WinRM operator error: {}".format(str(e)))
+ raise AirflowException(f"WinRM operator error: {str(e)}")
if return_code == 0:
# returning output if do_xcom_push is set
diff --git a/airflow/providers/mysql/hooks/mysql.py b/airflow/providers/mysql/hooks/mysql.py
index 5da0be2..7d1f71f 100644
--- a/airflow/providers/mysql/hooks/mysql.py
+++ b/airflow/providers/mysql/hooks/mysql.py
@@ -164,12 +164,10 @@ class MySqlHook(DbApiHook):
conn = self.get_conn()
cur = conn.cursor()
cur.execute(
- """
+ f"""
LOAD DATA LOCAL INFILE '{tmp_file}'
INTO TABLE {table}
- """.format(
- tmp_file=tmp_file, table=table
- )
+ """
)
conn.commit()
@@ -178,12 +176,10 @@ class MySqlHook(DbApiHook):
conn = self.get_conn()
cur = conn.cursor()
cur.execute(
- """
+ f"""
SELECT * INTO OUTFILE '{tmp_file}'
FROM {table}
- """.format(
- tmp_file=tmp_file, table=table
- )
+ """
)
conn.commit()
@@ -251,17 +247,12 @@ class MySqlHook(DbApiHook):
cursor = conn.cursor()
cursor.execute(
- """
+ f"""
LOAD DATA LOCAL INFILE '{tmp_file}'
{duplicate_key_handling}
INTO TABLE {table}
{extra_options}
- """.format(
- tmp_file=tmp_file,
- table=table,
- duplicate_key_handling=duplicate_key_handling,
- extra_options=extra_options,
- )
+ """
)
cursor.close()
diff --git a/airflow/providers/opsgenie/hooks/opsgenie_alert.py b/airflow/providers/opsgenie/hooks/opsgenie_alert.py
index aad834c..60f1734 100644
--- a/airflow/providers/opsgenie/hooks/opsgenie_alert.py
+++ b/airflow/providers/opsgenie/hooks/opsgenie_alert.py
@@ -82,5 +82,5 @@ class OpsgenieAlertHook(HttpHook):
return self.run(
endpoint='v2/alerts',
data=json.dumps(payload),
- headers={'Content-Type': 'application/json', 'Authorization': 'GenieKey %s' % api_key},
+ headers={'Content-Type': 'application/json', 'Authorization': f'GenieKey {api_key}'},
)
diff --git a/airflow/providers/oracle/hooks/oracle.py b/airflow/providers/oracle/hooks/oracle.py
index bfb40a4..1c5c536 100644
--- a/airflow/providers/oracle/hooks/oracle.py
+++ b/airflow/providers/oracle/hooks/oracle.py
@@ -178,7 +178,7 @@ class OracleHook(DbApiHook):
else:
lst.append(str(cell))
values = tuple(lst)
- sql = 'INSERT /*+ APPEND */ INTO {} {} VALUES ({})'.format(table, target_fields, ','.join(values))
+ sql = f"INSERT /*+ APPEND */ INTO {table} {target_fields} VALUES ({','.join(values)})"
cur.execute(sql)
if i % commit_every == 0:
conn.commit() # type: ignore[attr-defined]
diff --git a/airflow/providers/pagerduty/hooks/pagerduty.py b/airflow/providers/pagerduty/hooks/pagerduty.py
index 4de43df..e42357a 100644
--- a/airflow/providers/pagerduty/hooks/pagerduty.py
+++ b/airflow/providers/pagerduty/hooks/pagerduty.py
@@ -145,7 +145,7 @@ class PagerdutyHook(BaseHook):
actions = ('trigger', 'acknowledge', 'resolve')
if action not in actions:
- raise ValueError("Event action must be one of: %s" % ', '.join(actions))
+ raise ValueError(f"Event action must be one of: {', '.join(actions)}")
data = {
"event_action": action,
"payload": payload,
diff --git a/airflow/providers/plexus/operators/job.py b/airflow/providers/plexus/operators/job.py
index ece5df6..8f56987 100644
--- a/airflow/providers/plexus/operators/job.py
+++ b/airflow/providers/plexus/operators/job.py
@@ -149,9 +149,7 @@ class PlexusJobOperator(BaseOperator):
"""
missing_params = self.required_params - set(self.job_params)
if len(missing_params) > 0:
- raise AirflowException(
- "Missing the following required job_params: {}".format(", ".join(missing_params))
- )
+ raise AirflowException(f"Missing the following required job_params: {', '.join(missing_params)}")
params = {}
for prm in self.job_params:
if prm in self.lookups:
diff --git a/airflow/providers/postgres/hooks/postgres.py b/airflow/providers/postgres/hooks/postgres.py
index 0f38823..cd80bd9 100644
--- a/airflow/providers/postgres/hooks/postgres.py
+++ b/airflow/providers/postgres/hooks/postgres.py
@@ -223,7 +223,7 @@ class PostgresHook(DbApiHook):
else:
target_fields_fragment = ''
- sql = "INSERT INTO {} {} VALUES ({})".format(table, target_fields_fragment, ",".join(placeholders))
+ sql = f"INSERT INTO {table} {target_fields_fragment} VALUES ({','.join(placeholders)})"
if replace:
if target_fields is None:
diff --git a/airflow/providers/qubole/hooks/qubole.py b/airflow/providers/qubole/hooks/qubole.py
index 0cc3832..6655de5 100644
--- a/airflow/providers/qubole/hooks/qubole.py
+++ b/airflow/providers/qubole/hooks/qubole.py
@@ -262,7 +262,7 @@ class QuboleHook(BaseHook):
for key, value in self.kwargs.items(): # pylint: disable=too-many-nested-blocks
if key in COMMAND_ARGS[cmd_type]:
if key in HYPHEN_ARGS:
- args.append("--{}={}".format(key.replace('_', '-'), value))
+ args.append(f"--{key.replace('_', '-')}={value}")
elif key in positional_args_list:
inplace_args = value
elif key == 'tags':
@@ -273,7 +273,7 @@ class QuboleHook(BaseHook):
else:
args.append(f"--{key}={value}")
- args.append("--tags={}".format(','.join(filter(None, tags))))
+ args.append(f"--tags={','.join(filter(None, tags))}")
if inplace_args is not None:
args += inplace_args.split(' ')
diff --git a/airflow/providers/salesforce/hooks/salesforce.py b/airflow/providers/salesforce/hooks/salesforce.py
index aeaef5d..d76baac 100644
--- a/airflow/providers/salesforce/hooks/salesforce.py
+++ b/airflow/providers/salesforce/hooks/salesforce.py
@@ -146,7 +146,7 @@ class SalesforceHook(BaseHook):
:return: all instances of the object from Salesforce.
:rtype: dict
"""
- query = "SELECT {} FROM {}".format(",".join(fields), obj)
+ query = f"SELECT {','.join(fields)} FROM {obj}"
self.log.info(
"Making query to Salesforce: %s",
diff --git a/airflow/providers/sftp/operators/sftp.py b/airflow/providers/sftp/operators/sftp.py
index 39cc14b..f137352 100644
--- a/airflow/providers/sftp/operators/sftp.py
+++ b/airflow/providers/sftp/operators/sftp.py
@@ -154,7 +154,7 @@ class SFTPOperator(BaseOperator):
sftp_client.put(self.local_filepath, self.remote_filepath, confirm=self.confirm)
except Exception as e:
- raise AirflowException("Error while transferring {}, error: {}".format(file_msg, str(e)))
+ raise AirflowException(f"Error while transferring {file_msg}, error: {str(e)}")
return self.local_filepath
diff --git a/airflow/providers/ssh/operators/ssh.py b/airflow/providers/ssh/operators/ssh.py
index 14cec96..32ac1f2 100644
--- a/airflow/providers/ssh/operators/ssh.py
+++ b/airflow/providers/ssh/operators/ssh.py
@@ -168,7 +168,7 @@ class SSHOperator(BaseOperator):
raise AirflowException(f"error running cmd: {self.command}, error: {error_msg}")
except Exception as e:
- raise AirflowException("SSH operator error: {}".format(str(e)))
+ raise AirflowException(f"SSH operator error: {str(e)}")
return True
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index 87c269d..f1ddfdf 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -56,7 +56,7 @@ def renew_from_kt(principal: str, keytab: str, exit_on_fail: bool = True):
"""
# The config is specified in seconds. But we ask for that same amount in
# minutes to give ourselves a large renewal buffer.
- renewal_lifetime = "%sm" % conf.getint('kerberos', 'reinit_frequency')
+ renewal_lifetime = f"{conf.getint('kerberos', 'reinit_frequency')}m"
cmd_principal = principal or conf.get('kerberos', 'principal').replace("_HOST", socket.getfqdn())
@@ -128,7 +128,7 @@ def perform_krb181_workaround(principal: str):
ret = subprocess.call(cmdv, close_fds=True)
if ret != 0:
- principal = "{}/{}".format(principal or conf.get('kerberos', 'principal'), socket.getfqdn())
+ principal = f"{principal or conf.get('kerberos', 'principal')}/{socket.getfqdn()}"
princ = principal
ccache = conf.get('kerberos', 'principal')
log.error(
diff --git a/airflow/security/utils.py b/airflow/security/utils.py
index e5ceadb..ca203b5 100644
--- a/airflow/security/utils.py
+++ b/airflow/security/utils.py
@@ -55,7 +55,7 @@ def replace_hostname_pattern(components, host=None):
fqdn = host
if not fqdn or fqdn == '0.0.0.0':
fqdn = get_hostname()
- return '{}/{}@{}'.format(components[0], fqdn.lower(), components[2])
+ return f'{components[0]}/{fqdn.lower()}@{components[2]}'
def get_fqdn(hostname_or_ip=None):
diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py
index 9e997bc..4a6f11b 100644
--- a/airflow/sensors/date_time.py
+++ b/airflow/sensors/date_time.py
@@ -65,7 +65,7 @@ class DateTimeSensor(BaseSensorOperator):
self.target_time = target_time
else:
raise TypeError(
- "Expected str or datetime.datetime type for target_time. Got {}".format(type(target_time))
+ f"Expected str or datetime.datetime type for target_time. Got {type(target_time)}"
)
def poke(self, context: Dict) -> bool:
diff --git a/airflow/sensors/sql.py b/airflow/sensors/sql.py
index 573c7cd..923af6c 100644
--- a/airflow/sensors/sql.py
+++ b/airflow/sensors/sql.py
@@ -89,7 +89,7 @@ class SqlSensor(BaseSensorOperator):
if conn.conn_type not in allowed_conn_type:
raise AirflowException(
"The connection type is not supported by SqlSensor. "
- + "Supported connection types: {}".format(list(allowed_conn_type))
+ + f"Supported connection types: {list(allowed_conn_type)}"
)
return conn.get_hook()
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 68a0b44..600a693 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -265,7 +265,7 @@ def sigquit_handler(sig, frame): # pylint: disable=unused-argument
id_to_name = {th.ident: th.name for th in threading.enumerate()}
code = []
for thread_id, stack in sys._current_frames().items(): # pylint: disable=protected-access
- code.append("\n# Thread: {}({})".format(id_to_name.get(thread_id, ""), thread_id))
+ code.append(f"\n# Thread: {id_to_name.get(thread_id, '')}({thread_id})")
for filename, line_number, name, line in traceback.extract_stack(stack):
code.append(f'File: "{filename}", line {line_number}, in {name}')
if line:
diff --git a/airflow/utils/code_utils.py b/airflow/utils/code_utils.py
index 77cfa42..53b2db1 100644
--- a/airflow/utils/code_utils.py
+++ b/airflow/utils/code_utils.py
@@ -50,7 +50,7 @@ def get_python_source(x: Any) -> Optional[str]:
pass
if source_code is None:
- source_code = 'No source code available for {}'.format(type(x))
+ source_code = f'No source code available for {type(x)}'
return source_code
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6f04cbf..7617bda 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -120,7 +120,7 @@ class FileTaskHandler(logging.Handler):
log += "".join(file.readlines())
except Exception as e: # pylint: disable=broad-except
log = f"*** Failed to load local log file: {location}\n"
- log += "*** {}\n".format(str(e))
+ log += f"*** {str(e)}\n"
elif conf.get('core', 'executor') == 'KubernetesExecutor': # pylint: disable=too-many-nested-blocks
try:
from airflow.kubernetes.kube_client import get_kube_client
@@ -158,7 +158,7 @@ class FileTaskHandler(logging.Handler):
log += line.decode()
except Exception as f: # pylint: disable=broad-except
- log += '*** Unable to fetch logs from worker pod {} ***\n{}\n\n'.format(ti.hostname, str(f))
+ log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
else:
url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path).format(
ti=ti, worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')
@@ -180,7 +180,7 @@ class FileTaskHandler(logging.Handler):
log += '\n' + response.text
except Exception as e: # pylint: disable=broad-except
- log += "*** Failed to fetch log file from worker. {}\n".format(str(e))
+ log += f"*** Failed to fetch log file from worker. {str(e)}\n"
return log, {'end_of_log': True}
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index d302cbe..2b2521a 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -109,7 +109,7 @@ def make_aware(value, timezone=None):
# Check that we won't overwrite the timezone of an aware datetime.
if is_localized(value):
- raise ValueError("make_aware expects a naive datetime, got %s" % value)
+ raise ValueError(f"make_aware expects a naive datetime, got {value}")
if hasattr(value, 'fold'):
# In case of python 3.6 we want to do the same that pendulum does for python3.5
# i.e in case we move clock back we want to schedule the run at the time of the second
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 265a12f..09ebe9a 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -128,14 +128,14 @@ def generate_pages(current_page, num_of_pages, search=None, status=None, window=
is_disabled = 'disabled' if current_page <= 0 else ''
output.append(
first_node.format(
- href_link="?{}".format(get_params(page=0, search=search, status=status)), # noqa
+ href_link=f"?{get_params(page=0, search=search, status=status)}", # noqa
disabled=is_disabled,
)
)
page_link = void_link
if current_page > 0:
- page_link = '?{}'.format(get_params(page=(current_page - 1), search=search, status=status))
+ page_link = f'?{get_params(page=current_page - 1, search=search, status=status)}'
output.append(previous_node.format(href_link=page_link, disabled=is_disabled)) # noqa
@@ -157,7 +157,7 @@ def generate_pages(current_page, num_of_pages, search=None, status=None, window=
'is_active': 'active' if is_current(current_page, page) else '',
'href_link': void_link
if is_current(current_page, page)
- else '?{}'.format(get_params(page=page, search=search, status=status)),
+ else f'?{get_params(page=page, search=search, status=status)}',
'page_num': page + 1,
}
output.append(page_node.format(**vals)) # noqa
@@ -167,13 +167,13 @@ def generate_pages(current_page, num_of_pages, search=None, status=None, window=
page_link = (
void_link
if current_page >= num_of_pages - 1
- else '?{}'.format(get_params(page=current_page + 1, search=search, status=status))
+ else f'?{get_params(page=current_page + 1, search=search, status=status)}'
)
output.append(next_node.format(href_link=page_link, disabled=is_disabled)) # noqa
output.append(
last_node.format(
- href_link="?{}".format(get_params(page=last_page, search=search, status=status)), # noqa
+ href_link=f"?{get_params(page=last_page, search=search, status=status)}", # noqa
disabled=is_disabled,
)
)
diff --git a/airflow/www/validators.py b/airflow/www/validators.py
index 9699a47..fe35dcb 100644
--- a/airflow/www/validators.py
+++ b/airflow/www/validators.py
@@ -37,7 +37,7 @@ class GreaterEqualThan(EqualTo):
try:
other = form[self.fieldname]
except KeyError:
- raise ValidationError(field.gettext("Invalid field name '%s'." % self.fieldname))
+ raise ValidationError(field.gettext(f"Invalid field name '{self.fieldname}'."))
if field.data is None or other.data is None:
return
@@ -50,7 +50,7 @@ class GreaterEqualThan(EqualTo):
message = self.message
if message is None:
message = field.gettext(
- 'Field must be greater than or equal to %(other_label)s.' % message_args
+ f"Field must be greater than or equal to {message_args['other_label']}."
)
else:
message = message % message_args
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 1c00951..93ffc38 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1639,7 +1639,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
new_dag_state = set_dag_run_state_to_failed(dag, execution_date, commit=confirmed)
if confirmed:
- flash('Marked failed on {} task instances'.format(len(new_dag_state)))
+ flash(f'Marked failed on {len(new_dag_state)} task instances')
return redirect(origin)
else:
@@ -1668,7 +1668,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
new_dag_state = set_dag_run_state_to_success(dag, execution_date, commit=confirmed)
if confirmed:
- flash('Marked success on {} task instances'.format(len(new_dag_state)))
+ flash(f'Marked success on {len(new_dag_state)} task instances')
return redirect(origin)
else:
@@ -1752,7 +1752,7 @@ class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-m
commit=True,
)
- flash("Marked {} on {} task instances".format(state, len(altered)))
+ flash(f"Marked {state} on {len(altered)} task instances")
return redirect(origin)
to_be_altered = set_state(
@@ -3410,10 +3410,7 @@ class DagRunModelView(AirflowModelView):
cleared_ti_count += len(tis)
models.clear_task_instances(tis, session, dag=dag)
- flash(
- "{count} dag runs and {altered_ti_count} task instances "
- "were cleared".format(count=count, altered_ti_count=cleared_ti_count)
- )
+ flash(f"{count} dag runs and {cleared_ti_count} task instances were cleared")
except Exception: # noqa pylint: disable=broad-except
flash('Failed to clear state', 'error')
return redirect(self.get_default_url())
@@ -3636,7 +3633,7 @@ class TaskInstanceModelView(AirflowModelView):
models.clear_task_instances(task_instances_list, session, dag=dag)
session.commit()
- flash("{} task instances have been cleared".format(len(task_instances)))
+ flash(f"{len(task_instances)} task instances have been cleared")
self.update_redirect()
return redirect(self.get_redirect())
except Exception as e: # noqa pylint: disable=broad-except
@@ -3651,11 +3648,7 @@ class TaskInstanceModelView(AirflowModelView):
for ti in tis:
ti.set_state(target_state, session)
session.commit()
- flash(
- "{count} task instances were set to '{target_state}'".format(
- count=count, target_state=target_state
- )
- )
+ flash(f"{count} task instances were set to '{target_state}'")
except Exception: # noqa pylint: disable=broad-except
flash('Failed to set state', 'error')
diff --git a/breeze-complete b/breeze-complete
index 63b34ad..5562cee 100644
--- a/breeze-complete
+++ b/breeze-complete
@@ -97,6 +97,7 @@ dont-use-safe-filter
end-of-file-fixer
fix-encoding-pragma
flake8
+flynt
forbid-tabs
helm-lint
identity
diff --git a/docs/exts/airflow_intersphinx.py b/docs/exts/airflow_intersphinx.py
index ee83b8f..a3bd262 100644
--- a/docs/exts/airflow_intersphinx.py
+++ b/docs/exts/airflow_intersphinx.py
@@ -150,7 +150,7 @@ if __name__ == "__main__":
except ValueError as exc:
print(exc.args[0] % exc.args[1:])
except Exception as exc: # pylint: disable=broad-except
- print('Unknown error: %r' % exc)
+ print(f'Unknown error: {exc!r}')
provider_mapping = _generate_provider_intersphinx_mapping()
diff --git a/docs/exts/exampleinclude.py b/docs/exts/exampleinclude.py
index 141ca82..815eb21 100644
--- a/docs/exts/exampleinclude.py
+++ b/docs/exts/exampleinclude.py
@@ -140,7 +140,7 @@ def register_source(app, env, modname):
"""
entry = env._viewcode_modules.get(modname, None)
if entry is False:
- print("[%s] Entry is false for " % modname)
+ print(f"[{modname}] Entry is false for ")
return False
code_tags = app.emit_firstresult("viewcode-find-source", modname)
diff --git a/docs/exts/redirects.py b/docs/exts/redirects.py
index 7874124..20603c4 100644
--- a/docs/exts/redirects.py
+++ b/docs/exts/redirects.py
@@ -54,7 +54,7 @@ def generate_redirects(app):
from_path = from_path.replace(in_suffix, '.html')
to_path = to_path.replace(in_suffix, ".html")
- to_path_prefix = "..%s" % os.path.sep * (len(from_path.split(os.path.sep)) - 1)
+ to_path_prefix = f"..{os.path.sep}" * (len(from_path.split(os.path.sep)) - 1)
to_path = to_path_prefix + to_path
log.debug("Resolved redirect '%s' to '%s'", from_path, to_path)
diff --git a/metastore_browser/hive_metastore.py b/metastore_browser/hive_metastore.py
index 58d4da3..462f245 100644
--- a/metastore_browser/hive_metastore.py
+++ b/metastore_browser/hive_metastore.py
@@ -95,7 +95,7 @@ class MetastoreBrowserView(BaseView):
def partitions(self):
"""Retrieve table partitions"""
schema, table = request.args.get("table").split('.')
- sql = """
+ sql = f"""
SELECT
a.PART_NAME,
a.CREATE_TIME,
@@ -111,9 +111,7 @@ class MetastoreBrowserView(BaseView):
b.TBL_NAME like '{table}' AND
d.NAME like '{schema}'
ORDER BY PART_NAME DESC
- """.format(
- table=table, schema=schema
- )
+ """
hook = MySqlHook(METASTORE_MYSQL_CONN_ID)
df = hook.get_pandas_df(sql)
return df.to_html(
@@ -133,7 +131,7 @@ class MetastoreBrowserView(BaseView):
if DB_DENY_LIST:
dbs = ",".join(["'" + db + "'" for db in DB_DENY_LIST])
where_clause = f"AND b.name NOT IN ({dbs})"
- sql = """
+ sql = f"""
SELECT CONCAT(b.NAME, '.', a.TBL_NAME), TBL_TYPE
FROM TBLS a
JOIN DBS b ON a.DB_ID = b.DB_ID
@@ -143,10 +141,8 @@ class MetastoreBrowserView(BaseView):
b.NAME NOT LIKE '%tmp%' AND
b.NAME NOT LIKE '%temp%'
{where_clause}
- LIMIT {LIMIT};
- """.format(
- where_clause=where_clause, LIMIT=TABLE_SELECTOR_LIMIT
- )
+ LIMIT {TABLE_SELECTOR_LIMIT};
+ """
hook = MySqlHook(METASTORE_MYSQL_CONN_ID)
data = [{'id': row[0], 'text': row[0]} for row in hook.get_records(sql)]
return json.dumps(data)
diff --git a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
index 038e027..688f767 100755
--- a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
+++ b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
@@ -98,16 +98,16 @@ def assert_sets_equal(set1, set2):
try:
difference1 = set1.difference(set2)
except TypeError as e:
- raise AssertionError('invalid type when attempting set difference: %s' % e)
+ raise AssertionError(f'invalid type when attempting set difference: {e}')
except AttributeError as e:
- raise AssertionError('first argument does not support set difference: %s' % e)
+ raise AssertionError(f'first argument does not support set difference: {e}')
try:
difference2 = set2.difference(set1)
except TypeError as e:
- raise AssertionError('invalid type when attempting set difference: %s' % e)
+ raise AssertionError(f'invalid type when attempting set difference: {e}')
except AttributeError as e:
- raise AssertionError('second argument does not support set difference: %s' % e)
+ raise AssertionError(f'second argument does not support set difference: {e}')
if not (difference1 or difference2):
return
diff --git a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
index 6558901..0480286 100755
--- a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
+++ b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
@@ -121,7 +121,7 @@ def _write_option(configfile, idx, option):
if option["example"]:
if not str(option["name"]).endswith("_template"):
option["example"] = option["example"].replace("{", "{{").replace("}", "}}")
- configfile.write("# Example: {} = {}\n".format(option["name"], option["example"]))
+ configfile.write(f"# Example: {option['name']} = {option['example']}\n")
if option["default"] is not None:
if not isinstance(option["default"], str):
@@ -134,9 +134,9 @@ def _write_option(configfile, idx, option):
value = " " + option["default"]
else:
value = ""
- configfile.write("{} ={}\n".format(option["name"], value))
+ configfile.write(f"{option['name']} ={value}\n")
else:
- configfile.write("# {} =\n".format(option["name"]))
+ configfile.write(f"# {option['name']} =\n")
if __name__ == '__main__':
diff --git a/tests/api/common/experimental/test_pool.py b/tests/api/common/experimental/test_pool.py
index ae00226..3c75a14 100644
--- a/tests/api/common/experimental/test_pool.py
+++ b/tests/api/common/experimental/test_pool.py
@@ -39,7 +39,7 @@ class TestPool(unittest.TestCase):
clear_db_pools()
self.pools = [Pool.get_default_pool()]
for i in range(self.USER_POOL_COUNT):
- name = 'experimental_%s' % (i + 1)
+ name = f'experimental_{i + 1}'
pool = models.Pool(
pool=name,
slots=i,
diff --git a/tests/cli/commands/test_connection_command.py b/tests/cli/commands/test_connection_command.py
index ae78892..c81ff81 100644
--- a/tests/cli/commands/test_connection_command.py
+++ b/tests/cli/commands/test_connection_command.py
@@ -496,7 +496,7 @@ class TestCliAddConnections(unittest.TestCase):
"connections",
"add",
"new0",
- "--conn-uri=%s" % TEST_URL,
+ f"--conn-uri={TEST_URL}",
"--conn-description=new0 description",
],
"Successfully added `conn_id`=new0 : postgresql://airflow:airflow@host:5432/airflow",
@@ -516,7 +516,7 @@ class TestCliAddConnections(unittest.TestCase):
"connections",
"add",
"new1",
- "--conn-uri=%s" % TEST_URL,
+ f"--conn-uri={TEST_URL}",
"--conn-description=new1 description",
],
"Successfully added `conn_id`=new1 : postgresql://airflow:airflow@host:5432/airflow",
@@ -536,7 +536,7 @@ class TestCliAddConnections(unittest.TestCase):
"connections",
"add",
"new2",
- "--conn-uri=%s" % TEST_URL,
+ f"--conn-uri={TEST_URL}",
"--conn-extra",
"{'extra': 'yes'}",
],
@@ -557,7 +557,7 @@ class TestCliAddConnections(unittest.TestCase):
"connections",
"add",
"new3",
- "--conn-uri=%s" % TEST_URL,
+ f"--conn-uri={TEST_URL}",
"--conn-extra",
"{'extra': 'yes'}",
"--conn-description",
@@ -651,12 +651,12 @@ class TestCliAddConnections(unittest.TestCase):
def test_cli_connections_add_duplicate(self):
conn_id = "to_be_duplicated"
connection_command.connections_add(
- self.parser.parse_args(["connections", "add", conn_id, "--conn-uri=%s" % TEST_URL])
+ self.parser.parse_args(["connections", "add", conn_id, f"--conn-uri={TEST_URL}"])
)
# Check for addition attempt
with pytest.raises(SystemExit, match=rf"A connection with `conn_id`={conn_id} already exists"):
connection_command.connections_add(
- self.parser.parse_args(["connections", "add", conn_id, "--conn-uri=%s" % TEST_URL])
+ self.parser.parse_args(["connections", "add", conn_id, f"--conn-uri={TEST_URL}"])
)
def test_cli_connections_add_delete_with_missing_parameters(self):
@@ -671,7 +671,7 @@ class TestCliAddConnections(unittest.TestCase):
# Attempt to add with invalid uri
with pytest.raises(SystemExit, match=r"The URI provided to --conn-uri is invalid: nonsense_uri"):
connection_command.connections_add(
- self.parser.parse_args(["connections", "add", "new1", "--conn-uri=%s" % "nonsense_uri"])
+ self.parser.parse_args(["connections", "add", "new1", f"--conn-uri={'nonsense_uri'}"])
)
diff --git a/tests/core/test_core.py b/tests/core/test_core.py
index fae2c73..f6dd6ee 100644
--- a/tests/core/test_core.py
+++ b/tests/core/test_core.py
@@ -172,7 +172,7 @@ class TestCore(unittest.TestCase):
op = BashOperator(
task_id='test_bash_operator_kill',
execution_timeout=timedelta(seconds=1),
- bash_command="/bin/bash -c 'sleep %s'" % sleep_time,
+ bash_command=f"/bin/bash -c 'sleep {sleep_time}'",
dag=self.dag,
)
with pytest.raises(AirflowTaskTimeout):
diff --git a/tests/dags/test_subdag.py b/tests/dags/test_subdag.py
index e45b2a1..f2227dc 100644
--- a/tests/dags/test_subdag.py
+++ b/tests/dags/test_subdag.py
@@ -48,7 +48,7 @@ def subdag(parent_dag_name, child_dag_name, args):
for i in range(2):
DummyOperator(
- task_id='{}-task-{}'.format(child_dag_name, i + 1),
+ task_id=f'{child_dag_name}-task-{i + 1}',
default_args=args,
dag=dag_subdag,
)
diff --git a/tests/dags_corrupted/test_impersonation_custom.py b/tests/dags_corrupted/test_impersonation_custom.py
index 77ea1ed..4d3b978 100644
--- a/tests/dags_corrupted/test_impersonation_custom.py
+++ b/tests/dags_corrupted/test_impersonation_custom.py
@@ -39,7 +39,7 @@ dag = DAG(dag_id='impersonation_with_custom_pkg', default_args=args)
def print_today():
date_time = FakeDatetime.utcnow()
- print('Today is {}'.format(date_time.strftime('%Y-%m-%d')))
+ print(f"Today is {date_time.strftime('%Y-%m-%d')}")
def check_hive_conf():
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 944fa49..44edc47 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -391,7 +391,7 @@ class ClassWithCustomAttributes:
setattr(self, key, value)
def __str__(self):
- return "{}({})".format(ClassWithCustomAttributes.__name__, str(self.__dict__))
+ return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})"
def __repr__(self):
return self.__str__()
diff --git a/tests/hooks/test_dbapi.py b/tests/hooks/test_dbapi.py
index 2cc916d..0f6c55a 100644
--- a/tests/hooks/test_dbapi.py
+++ b/tests/hooks/test_dbapi.py
@@ -125,7 +125,7 @@ class TestDbApiHook(unittest.TestCase):
commit_count = 2 # The first and last commit
assert commit_count == self.conn.commit.call_count
- sql = "INSERT INTO {} ({}) VALUES (%s)".format(table, target_fields[0])
+ sql = f"INSERT INTO {table} ({target_fields[0]}) VALUES (%s)"
for row in rows:
self.cur.execute.assert_any_call(sql, row)
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index 95607a6..f942b89 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -42,7 +42,7 @@ class ClassWithCustomAttributes:
setattr(self, key, value)
def __str__(self):
- return "{}({})".format(ClassWithCustomAttributes.__name__, str(self.__dict__))
+ return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})"
def __repr__(self):
return self.__str__()
@@ -154,7 +154,7 @@ class TestBaseOperator(unittest.TestCase):
({"user_defined_macros": {"foo": "bar"}}, "{{ foo }}", {}, "bar"),
({"user_defined_macros": {"foo": "bar"}}, 1, {}, 1),
(
- {"user_defined_filters": {"hello": lambda name: "Hello %s" % name}},
+ {"user_defined_filters": {"hello": lambda name: f"Hello {name}"}},
"{{ 'world' | hello }}",
{},
"Hello world",
diff --git a/tests/models/test_connection.py b/tests/models/test_connection.py
index a96b89a..526d029 100644
--- a/tests/models/test_connection.py
+++ b/tests/models/test_connection.py
@@ -55,7 +55,7 @@ class UriTestCaseConfig:
@staticmethod
def uri_test_name(func, num, param):
- return "{}_{}_{}".format(func.__name__, num, param.args[0].description.replace(' ', '_'))
+ return f"{func.__name__}_{num}_{param.args[0].description.replace(' ', '_')}"
class TestConnection(unittest.TestCase):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 60171d8..123c119 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -476,7 +476,7 @@ class TestDag(unittest.TestCase):
def test_user_defined_filters(self):
def jinja_udf(name):
- return 'Hello %s' % name
+ return f'Hello {name}'
dag = models.DAG('test-dag', start_date=DEFAULT_DATE, user_defined_filters={"hello": jinja_udf})
jinja_env = dag.get_template_env()
@@ -1540,7 +1540,7 @@ class TestDag(unittest.TestCase):
)
for i in range(2):
- DummyOperator(task_id='{}-task-{}'.format(child_dag_name, i + 1), dag=dag_subdag)
+ DummyOperator(task_id=f'{child_dag_name}-task-{i + 1}', dag=dag_subdag)
return dag_subdag
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 6c6b1cb..ce11892 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -105,7 +105,7 @@ class TestDagBag(unittest.TestCase):
dagbag = models.DagBag(include_examples=False, safe_mode=True)
assert len(dagbag.dagbag_stats) == 1
- assert dagbag.dagbag_stats[0].file == "/{}".format(os.path.basename(f.name))
+ assert dagbag.dagbag_stats[0].file == f"/{os.path.basename(f.name)}"
def test_safe_mode_heuristic_mismatch(self):
"""With safe mode enabled, a file not matching the discovery heuristics
@@ -122,7 +122,7 @@ class TestDagBag(unittest.TestCase):
with conf_vars({('core', 'dags_folder'): self.empty_dir}):
dagbag = models.DagBag(include_examples=False, safe_mode=False)
assert len(dagbag.dagbag_stats) == 1
- assert dagbag.dagbag_stats[0].file == "/{}".format(os.path.basename(f.name))
+ assert dagbag.dagbag_stats[0].file == f"/{os.path.basename(f.name)}"
def test_process_file_that_contains_multi_bytes_char(self):
"""
@@ -298,7 +298,7 @@ class TestDagBag(unittest.TestCase):
actual_found_dag_ids = list(map(lambda dag: dag.dag_id, actual_found_dags))
for dag_id in expected_dag_ids:
- actual_dagbag.log.info('validating %s' % dag_id)
+ actual_dagbag.log.info(f'validating {dag_id}')
assert (
dag_id in actual_found_dag_ids
) == should_be_found, 'dag "{}" should {}have been found after processing dag "{}"'.format(
diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py
index 1cf4e3f..f754753 100644
--- a/tests/models/test_renderedtifields.py
+++ b/tests/models/test_renderedtifields.py
@@ -50,7 +50,7 @@ class ClassWithCustomAttributes:
setattr(self, key, value)
def __str__(self):
- return "{}({})".format(ClassWithCustomAttributes.__name__, str(self.__dict__))
+ return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})"
def __repr__(self):
return self.__str__()
diff --git a/tests/providers/amazon/aws/hooks/test_batch_waiters.py b/tests/providers/amazon/aws/hooks/test_batch_waiters.py
index 51d42e0..b852c2e 100644
--- a/tests/providers/amazon/aws/hooks/test_batch_waiters.py
+++ b/tests/providers/amazon/aws/hooks/test_batch_waiters.py
@@ -198,7 +198,7 @@ def batch_infrastructure(
assert resp["jobDefinitionArn"]
job_definition_arn = resp["jobDefinitionArn"]
assert resp["revision"]
- assert resp["jobDefinitionArn"].endswith("{}:{}".format(resp["jobDefinitionName"], resp["revision"]))
+ assert resp["jobDefinitionArn"].endswith(f"{resp['jobDefinitionName']}:{resp['revision']}")
infrastructure.vpc_id = vpc_id
infrastructure.subnet_id = subnet_id
diff --git a/tests/providers/amazon/aws/hooks/test_s3.py b/tests/providers/amazon/aws/hooks/test_s3.py
index d962068..fc00d9d 100644
--- a/tests/providers/amazon/aws/hooks/test_s3.py
+++ b/tests/providers/amazon/aws/hooks/test_s3.py
@@ -118,8 +118,8 @@ class TestAwsS3Hook:
bucket = hook.get_bucket(s3_bucket)
# we don't need to test the paginator that's covered by boto tests
- keys = ["%s/b" % i for i in range(2)]
- dirs = ["%s/" % i for i in range(2)]
+ keys = [f"{i}/b" for i in range(2)]
+ dirs = [f"{i}/" for i in range(2)]
for key in keys:
bucket.put_object(Key=key, Body=b'a')
diff --git a/tests/providers/apache/hive/operators/test_hive_stats.py b/tests/providers/apache/hive/operators/test_hive_stats.py
index d38e007..02dbdd8 100644
--- a/tests/providers/apache/hive/operators/test_hive_stats.py
+++ b/tests/providers/apache/hive/operators/test_hive_stats.py
@@ -274,17 +274,13 @@ class TestHiveStatsCollectionOperator(TestHiveEnvironment):
hive_stats_collection_operator = HiveStatsCollectionOperator(**self.kwargs)
hive_stats_collection_operator.execute(context={})
- sql = """
+ sql = f"""
DELETE FROM hive_stats
WHERE
- table_name='{}' AND
- partition_repr='{}' AND
- dttm='{}';
- """.format(
- hive_stats_collection_operator.table,
- mock_json_dumps.return_value,
- hive_stats_collection_operator.dttm,
- )
+ table_name='{hive_stats_collection_operator.table}' AND
+ partition_repr='{mock_json_dumps.return_value}' AND
+ dttm='{hive_stats_collection_operator.dttm}';
+ """
mock_mysql_hook.return_value.run.assert_called_once_with(sql)
@unittest.skipIf(
diff --git a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
index c6f7736..0413b5c 100644
--- a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
+++ b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
@@ -317,8 +317,8 @@ class TestTransfer(unittest.TestCase):
with hook.get_conn() as conn:
conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
conn.execute(
- """
- CREATE TABLE {} (
+ f"""
+ CREATE TABLE {mysql_table} (
c0 TINYINT,
c1 SMALLINT,
c2 MEDIUMINT,
@@ -326,9 +326,7 @@ class TestTransfer(unittest.TestCase):
c4 BIGINT,
c5 TIMESTAMP
)
- """.format(
- mysql_table
- )
+ """
)
op = MySqlToHiveOperator(
@@ -368,14 +366,12 @@ class TestTransfer(unittest.TestCase):
with hook.get_conn() as conn:
conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
conn.execute(
- """
- CREATE TABLE {} (
+ f"""
+ CREATE TABLE {mysql_table} (
c0 VARCHAR(25),
c1 VARCHAR(25)
)
- """.format(
- mysql_table
- )
+ """
)
conn.execute(
"""
@@ -475,8 +471,8 @@ class TestTransfer(unittest.TestCase):
with hook.get_conn() as conn:
conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
conn.execute(
- """
- CREATE TABLE {} (
+ f"""
+ CREATE TABLE {mysql_table} (
c0 TINYINT UNSIGNED,
c1 SMALLINT UNSIGNED,
c2 MEDIUMINT UNSIGNED,
@@ -488,9 +484,7 @@ class TestTransfer(unittest.TestCase):
c8 INT,
c9 BIGINT
)
- """.format(
- mysql_table
- )
+ """
)
conn.execute(
"""
diff --git a/tests/providers/apache/spark/hooks/test_spark_sql.py b/tests/providers/apache/spark/hooks/test_spark_sql.py
index 85a5159..35e4330 100644
--- a/tests/providers/apache/spark/hooks/test_spark_sql.py
+++ b/tests/providers/apache/spark/hooks/test_spark_sql.py
@@ -60,11 +60,11 @@ class TestSparkSqlHook(unittest.TestCase):
cmd = ' '.join(hook._prepare_command(""))
# Check all the parameters
- assert "--executor-cores {}".format(self._config['executor_cores']) in cmd
- assert "--executor-memory {}".format(self._config['executor_memory']) in cmd
- assert "--keytab {}".format(self._config['keytab']) in cmd
- assert "--name {}".format(self._config['name']) in cmd
- assert "--num-executors {}".format(self._config['num_executors']) in cmd
+ assert f"--executor-cores {self._config['executor_cores']}" in cmd
+ assert f"--executor-memory {self._config['executor_memory']}" in cmd
+ assert f"--keytab {self._config['keytab']}" in cmd
+ assert f"--name {self._config['name']}" in cmd
+ assert f"--num-executors {self._config['num_executors']}" in cmd
sql_path = get_after('-f', hook._prepare_command(""))
assert self._config['sql'].strip() == sql_path
diff --git a/tests/providers/apache/sqoop/hooks/test_sqoop.py b/tests/providers/apache/sqoop/hooks/test_sqoop.py
index 332021a..08926d4 100644
--- a/tests/providers/apache/sqoop/hooks/test_sqoop.py
+++ b/tests/providers/apache/sqoop/hooks/test_sqoop.py
@@ -179,29 +179,29 @@ class TestSqoopHook(unittest.TestCase):
# Check if the config has been extracted from the json
if self._config_json['namenode']:
- assert "-fs {}".format(self._config_json['namenode']) in cmd
+ assert f"-fs {self._config_json['namenode']}" in cmd
if self._config_json['job_tracker']:
- assert "-jt {}".format(self._config_json['job_tracker']) in cmd
+ assert f"-jt {self._config_json['job_tracker']}" in cmd
if self._config_json['libjars']:
- assert "-libjars {}".format(self._config_json['libjars']) in cmd
+ assert f"-libjars {self._config_json['libjars']}" in cmd
if self._config_json['files']:
- assert "-files {}".format(self._config_json['files']) in cmd
+ assert f"-files {self._config_json['files']}" in cmd
if self._config_json['archives']:
- assert "-archives {}".format(self._config_json['archives']) in cmd
+ assert f"-archives {self._config_json['archives']}" in cmd
- assert "--hcatalog-database {}".format(self._config['hcatalog_database']) in cmd
- assert "--hcatalog-table {}".format(self._config['hcatalog_table']) in cmd
+ assert f"--hcatalog-database {self._config['hcatalog_database']}" in cmd
+ assert f"--hcatalog-table {self._config['hcatalog_table']}" in cmd
# Check the regulator stuff passed by the default constructor
if self._config['verbose']:
assert "--verbose" in cmd
if self._config['num_mappers']:
- assert "--num-mappers {}".format(self._config['num_mappers']) in cmd
+ assert f"--num-mappers {self._config['num_mappers']}" in cmd
for key, value in self._config['properties'].items():
assert f"-D {key}={value}" in cmd
@@ -243,21 +243,14 @@ class TestSqoopHook(unittest.TestCase):
)
)
- assert "--input-null-string {}".format(self._config_export['input_null_string']) in cmd
- assert "--input-null-non-string {}".format(self._config_export['input_null_non_string']) in cmd
- assert "--staging-table {}".format(self._config_export['staging_table']) in cmd
- assert "--enclosed-by {}".format(self._config_export['enclosed_by']) in cmd
- assert "--escaped-by {}".format(self._config_export['escaped_by']) in cmd
- assert (
- "--input-fields-terminated-by {}".format(self._config_export['input_fields_terminated_by']) in cmd
- )
- assert (
- "--input-lines-terminated-by {}".format(self._config_export['input_lines_terminated_by']) in cmd
- )
- assert (
- "--input-optionally-enclosed-by {}".format(self._config_export['input_optionally_enclosed_by'])
- in cmd
- )
+ assert f"--input-null-string {self._config_export['input_null_string']}" in cmd
+ assert f"--input-null-non-string {self._config_export['input_null_non_string']}" in cmd
+ assert f"--staging-table {self._config_export['staging_table']}" in cmd
+ assert f"--enclosed-by {self._config_export['enclosed_by']}" in cmd
+ assert f"--escaped-by {self._config_export['escaped_by']}" in cmd
+ assert f"--input-fields-terminated-by {self._config_export['input_fields_terminated_by']}" in cmd
+ assert f"--input-lines-terminated-by {self._config_export['input_lines_terminated_by']}" in cmd
+ assert f"--input-optionally-enclosed-by {self._config_export['input_optionally_enclosed_by']}" in cmd
# these options are from the extra export options
assert "--update-key id" in cmd
assert "--update-mode allowinsert" in cmd
@@ -301,10 +294,10 @@ class TestSqoopHook(unittest.TestCase):
if self._config_import['direct']:
assert '--direct' in cmd
- assert '--target-dir {}'.format(self._config_import['target_dir']) in cmd
+ assert f"--target-dir {self._config_import['target_dir']}" in cmd
- assert '--driver {}'.format(self._config_import['driver']) in cmd
- assert '--split-by {}'.format(self._config_import['split_by']) in cmd
+ assert f"--driver {self._config_import['driver']}" in cmd
+ assert f"--split-by {self._config_import['split_by']}" in cmd
# these are from extra options, but not passed to this cmd import command
assert '--show' not in cmd
assert 'hcatalog-storage-stanza \"stored as orcfile\"' not in cmd
diff --git a/tests/providers/elasticsearch/log/elasticmock/__init__.py b/tests/providers/elasticsearch/log/elasticmock/__init__.py
index 2490dbe..c2a9080 100644
--- a/tests/providers/elasticsearch/log/elasticmock/__init__.py
+++ b/tests/providers/elasticsearch/log/elasticmock/__init__.py
@@ -51,7 +51,7 @@ ELASTIC_INSTANCES = {} # type: Dict[str, FakeElasticsearch]
def _get_elasticmock(hosts=None, *args, **kwargs): # pylint: disable=unused-argument
host = _normalize_hosts(hosts)[0]
- elastic_key = '{}:{}'.format(host.get('host', 'localhost'), host.get('port', 9200))
+ elastic_key = f"{host.get('host', 'localhost')}:{host.get('port', 9200)}"
if elastic_key in ELASTIC_INSTANCES:
connection = ELASTIC_INSTANCES.get(elastic_key)
diff --git a/tests/providers/google/cloud/hooks/test_cloud_sql.py b/tests/providers/google/cloud/hooks/test_cloud_sql.py
index 003245d..9e9ec70 100644
--- a/tests/providers/google/cloud/hooks/test_cloud_sql.py
+++ b/tests/providers/google/cloud/hooks/test_cloud_sql.py
@@ -1061,9 +1061,7 @@ class TestCloudSqlDatabaseQueryHook(unittest.TestCase):
project = self.sql_connection.extra_dejson['project_id']
location = self.sql_connection.extra_dejson['location']
instance = self.sql_connection.extra_dejson['instance']
- instance_spec = "{project}:{location}:{instance}".format(
- project=project, location=location, instance=instance
- )
+ instance_spec = f"{project}:{location}:{instance}"
assert sqlproxy_runner.instance_specification == instance_spec
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection")
diff --git a/tests/providers/google/cloud/hooks/test_pubsub.py b/tests/providers/google/cloud/hooks/test_pubsub.py
index 0841806..4086526 100644
--- a/tests/providers/google/cloud/hooks/test_pubsub.py
+++ b/tests/providers/google/cloud/hooks/test_pubsub.py
@@ -130,17 +130,17 @@ class TestPubSubHook(unittest.TestCase):
@mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
def test_delete_nonexisting_topic_failifnotexists(self, mock_service):
mock_service.return_value.delete_topic.side_effect = NotFound(
- 'Topic does not exists: %s' % EXPANDED_TOPIC
+ f'Topic does not exists: {EXPANDED_TOPIC}'
)
with pytest.raises(PubSubException) as ctx:
self.pubsub_hook.delete_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_not_exists=True)
- assert str(ctx.value) == 'Topic does not exist: %s' % EXPANDED_TOPIC
+ assert str(ctx.value) == f'Topic does not exist: {EXPANDED_TOPIC}'
@mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
def test_delete_topic_api_call_error(self, mock_service):
mock_service.return_value.delete_topic.side_effect = GoogleAPICallError(
- 'Error deleting topic: %s' % EXPANDED_TOPIC
+ f'Error deleting topic: {EXPANDED_TOPIC}'
)
with pytest.raises(PubSubException):
self.pubsub_hook.delete_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_not_exists=True)
@@ -148,23 +148,23 @@ class TestPubSubHook(unittest.TestCase):
@mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
def test_create_preexisting_topic_failifexists(self, mock_service):
mock_service.return_value.create_topic.side_effect = AlreadyExists(
- 'Topic already exists: %s' % TEST_TOPIC
+ f'Topic already exists: {TEST_TOPIC}'
)
with pytest.raises(PubSubException) as ctx:
self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_exists=True)
- assert str(ctx.value) == 'Topic already exists: %s' % TEST_TOPIC
+ assert str(ctx.value) == f'Topic already exists: {TEST_TOPIC}'
@mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
def test_create_preexisting_topic_nofailifexists(self, mock_service):
mock_service.return_value.create_topic.side_effect = AlreadyExists(
- 'Topic already exists: %s' % EXPANDED_TOPIC
+ f'Topic already exists: {EXPANDED_TOPIC}'
)
self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC)
@mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
def test_create_topic_api_call_error(self, mock_service):
mock_service.return_value.create_topic.side_effect = GoogleAPICallError(
- 'Error creating topic: %s' % TEST_TOPIC
+ f'Error creating topic: {TEST_TOPIC}'
)
with pytest.raises(PubSubException):
self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_exists=True)
@@ -238,18 +238,18 @@ class TestPubSubHook(unittest.TestCase):
@mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
def test_delete_nonexisting_subscription_failifnotexists(self, mock_service):
mock_service.delete_subscription.side_effect = NotFound(
- 'Subscription does not exists: %s' % EXPANDED_SUBSCRIPTION
+ f'Subscription does not exists: {EXPANDED_SUBSCRIPTION}'
)
with pytest.raises(PubSubException) as ctx:
self.pubsub_hook.delete_subscription(
project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION, fail_if_not_exists=True
)
- assert str(ctx.value) == 'Subscription does not exist: %s' % EXPANDED_SUBSCRIPTION
+ assert str(ctx.value) == f'Subscription does not exist: {EXPANDED_SUBSCRIPTION}'
@mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
def test_delete_subscription_api_call_error(self, mock_service):
mock_service.delete_subscription.side_effect = GoogleAPICallError(
- 'Error deleting subscription %s' % EXPANDED_SUBSCRIPTION
+ f'Error deleting subscription {EXPANDED_SUBSCRIPTION}'
)
with pytest.raises(PubSubException):
self.pubsub_hook.delete_subscription(
@@ -262,7 +262,7 @@ class TestPubSubHook(unittest.TestCase):
self, mock_uuid, mock_service
): # noqa # pylint: disable=unused-argument,line-too-long
create_method = mock_service.create_subscription
- expected_name = EXPANDED_SUBSCRIPTION.replace(TEST_SUBSCRIPTION, 'sub-%s' % TEST_UUID)
+ expected_name = EXPANDED_SUBSCRIPTION.replace(TEST_SUBSCRIPTION, f'sub-{TEST_UUID}')
response = self.pubsub_hook.create_subscription(project_id=TEST_PROJECT, topic=TEST_TOPIC)
create_method.assert_called_once_with(
@@ -282,7 +282,7 @@ class TestPubSubHook(unittest.TestCase):
timeout=None,
metadata=None,
)
- assert 'sub-%s' % TEST_UUID == response
+ assert f'sub-{TEST_UUID}' == response
@mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
def test_create_subscription_with_ack_deadline(self, mock_service):
@@ -342,18 +342,18 @@ class TestPubSubHook(unittest.TestCase):
@mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
def test_create_subscription_failifexists(self, mock_service):
mock_service.create_subscription.side_effect = AlreadyExists(
- 'Subscription already exists: %s' % EXPANDED_SUBSCRIPTION
+ f'Subscription already exists: {EXPANDED_SUBSCRIPTION}'
)
with pytest.raises(PubSubException) as ctx:
self.pubsub_hook.create_subscription(
project_id=TEST_PROJECT, topic=TEST_TOPIC, subscription=TEST_SUBSCRIPTION, fail_if_exists=True
)
- assert str(ctx.value) == 'Subscription already exists: %s' % EXPANDED_SUBSCRIPTION
+ assert str(ctx.value) == f'Subscription already exists: {EXPANDED_SUBSCRIPTION}'
@mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
def test_create_subscription_api_call_error(self, mock_service):
mock_service.create_subscription.side_effect = GoogleAPICallError(
- 'Error creating subscription %s' % EXPANDED_SUBSCRIPTION
+ f'Error creating subscription {EXPANDED_SUBSCRIPTION}'
)
with pytest.raises(PubSubException):
self.pubsub_hook.create_subscription(
@@ -363,7 +363,7 @@ class TestPubSubHook(unittest.TestCase):
@mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
def test_create_subscription_nofailifexists(self, mock_service):
mock_service.create_subscription.side_effect = AlreadyExists(
- 'Subscription already exists: %s' % EXPANDED_SUBSCRIPTION
+ f'Subscription already exists: {EXPANDED_SUBSCRIPTION}'
)
response = self.pubsub_hook.create_subscription(
project_id=TEST_PROJECT, topic=TEST_TOPIC, subscription=TEST_SUBSCRIPTION
diff --git a/tests/providers/google/cloud/operators/test_dataflow.py b/tests/providers/google/cloud/operators/test_dataflow.py
index 7e290d7..c682a31 100644
--- a/tests/providers/google/cloud/operators/test_dataflow.py
+++ b/tests/providers/google/cloud/operators/test_dataflow.py
@@ -55,7 +55,7 @@ DEFAULT_OPTIONS_TEMPLATE = {
'zone': 'us-central1-f',
}
ADDITIONAL_OPTIONS = {'output': 'gs://test/output', 'labels': {'foo': 'bar'}}
-TEST_VERSION = 'v{}'.format(version.replace('.', '-').replace('+', '-'))
+TEST_VERSION = f"v{version.replace('.', '-').replace('+', '-')}"
EXPECTED_ADDITIONAL_OPTIONS = {
'output': 'gs://test/output',
'labels': {'foo': 'bar', 'airflow-version': TEST_VERSION},
diff --git a/tests/providers/google/cloud/operators/test_mlengine_utils.py b/tests/providers/google/cloud/operators/test_mlengine_utils.py
index 539ee60..65b41b6 100644
--- a/tests/providers/google/cloud/operators/test_mlengine_utils.py
+++ b/tests/providers/google/cloud/operators/test_mlengine_utils.py
@@ -28,7 +28,7 @@ from airflow.providers.google.cloud.utils import mlengine_operator_utils
from airflow.version import version
DEFAULT_DATE = datetime.datetime(2017, 6, 6)
-TEST_VERSION = 'v{}'.format(version.replace('.', '-').replace('+', '-'))
+TEST_VERSION = f"v{version.replace('.', '-').replace('+', '-')}"
class TestCreateEvaluateOps(unittest.TestCase):
@@ -80,7 +80,7 @@ class TestCreateEvaluateOps(unittest.TestCase):
input_paths=input_with_model['inputPaths'],
prediction_path=input_with_model['outputPath'],
metric_fn_and_keys=(self.metric_fn, ['err']),
- validate_fn=(lambda x: 'err=%.1f' % x['err']),
+ validate_fn=(lambda x: f"err={x['err']:.1f}"),
dag=self.dag,
py_interpreter="python3",
)
@@ -168,7 +168,7 @@ class TestCreateEvaluateOps(unittest.TestCase):
'input_paths': input_with_model['inputPaths'],
'prediction_path': input_with_model['outputPath'],
'metric_fn_and_keys': (self.metric_fn, ['err']),
- 'validate_fn': (lambda x: 'err=%.1f' % x['err']),
+ 'validate_fn': (lambda x: f"err={x['err']:.1f}"),
}
with pytest.raises(AirflowException, match='Missing model origin'):
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py b/tests/providers/google/cloud/sensors/test_gcs.py
index e3c8917..77e0c35 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -175,7 +175,7 @@ class TestGoogleCloudStoragePrefixSensor(TestCase):
impersonation_chain=TEST_IMPERSONATION_CHAIN,
poke_interval=0,
)
- generated_messages = ['test-prefix/obj%s' % i for i in range(5)]
+ generated_messages = [f'test-prefix/obj{i}' for i in range(5)]
mock_hook.return_value.list.return_value = generated_messages
response = task.execute(None)
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py b/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py
index 1621829..b3c0acd 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py
@@ -30,7 +30,7 @@ class TestBigQueryToBigQueryOperator(unittest.TestCase):
@mock.patch('airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryHook')
def test_execute(self, mock_hook):
source_project_dataset_tables = f'{TEST_DATASET}.{TEST_TABLE_ID}'
- destination_project_dataset_table = '{}.{}'.format(TEST_DATASET + '_new', TEST_TABLE_ID)
+ destination_project_dataset_table = f"{TEST_DATASET + '_new'}.{TEST_TABLE_ID}"
write_disposition = 'WRITE_EMPTY'
create_disposition = 'CREATE_IF_NEEDED'
labels = {'k1': 'v1'}
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index 5c0c38c..2df9fea 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -143,7 +143,7 @@ class TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase):
source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_WILDCARD_FILENAME,
destination_bucket=DESTINATION_BUCKET,
- destination_object='{}/{}'.format(DESTINATION_OBJECT_PREFIX, SOURCE_OBJECT_WILDCARD_SUFFIX[:-1]),
+ destination_object=f'{DESTINATION_OBJECT_PREFIX}/{SOURCE_OBJECT_WILDCARD_SUFFIX[:-1]}',
)
operator.execute(None)
diff --git a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py
index 289eb43..5e19c17 100644
--- a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py
+++ b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py
@@ -83,11 +83,11 @@ METRIC_KEYS_EXPECTED = ','.join(METRIC_KEYS)
def validate_err_and_count(summary):
if summary['err'] > 0.2:
- raise ValueError('Too high err>0.2; summary=%s' % summary)
+ raise ValueError(f'Too high err>0.2; summary={summary}')
if summary['mse'] > 0.05:
- raise ValueError('Too high mse>0.05; summary=%s' % summary)
+ raise ValueError(f'Too high mse>0.05; summary={summary}')
if summary['count'] < 1000:
- raise ValueError('Too few instances<1000; summary=%s' % summary)
+ raise ValueError(f'Too few instances<1000; summary={summary}')
return summary
diff --git a/tests/providers/mysql/hooks/test_mysql.py b/tests/providers/mysql/hooks/test_mysql.py
index 538381f..19f0bd8 100644
--- a/tests/providers/mysql/hooks/test_mysql.py
+++ b/tests/providers/mysql/hooks/test_mysql.py
@@ -431,10 +431,8 @@ class TestMySql(unittest.TestCase):
from tests.test_utils.asserts import assert_equal_ignore_multiple_spaces
assert mock_execute.call_count == 1
- query = """
+ query = f"""
SELECT * INTO OUTFILE '{tmp_file}'
FROM {table}
- """.format(
- tmp_file=tmp_file, table=table
- )
+ """
assert_equal_ignore_multiple_spaces(self, mock_execute.call_args[0][0], query)
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 2046e22..9c75513 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -197,7 +197,7 @@ def make_user_defined_macro_filter_dag():
user_defined_macros={
'next_execution_date': compute_next_execution_date,
},
- user_defined_filters={'hello': lambda name: 'Hello %s' % name},
+ user_defined_filters={'hello': lambda name: f'Hello {name}'},
catchup=False,
)
BashOperator(
@@ -731,7 +731,7 @@ class TestStringifiedDAGs(unittest.TestCase):
setattr(self, key, value)
def __str__(self):
- return "{}({})".format(self.__class__.__name__, str(self.__dict__))
+ return f"{self.__class__.__name__}({str(self.__dict__)})"
def __repr__(self):
return self.__str__()
diff --git a/tests/test_utils/gcp_system_helpers.py b/tests/test_utils/gcp_system_helpers.py
index 6572111..314f09a 100644
--- a/tests/test_utils/gcp_system_helpers.py
+++ b/tests/test_utils/gcp_system_helpers.py
@@ -184,7 +184,7 @@ class GoogleSystemTest(SystemTest):
"gsutil",
"iam",
"ch",
- "serviceAccount:%s:admin" % account_email,
+ f"serviceAccount:{account_email}:admin",
bucket_name,
]
)
diff --git a/tests/test_utils/logging_command_executor.py b/tests/test_utils/logging_command_executor.py
index 1ebf729..5fca244 100644
--- a/tests/test_utils/logging_command_executor.py
+++ b/tests/test_utils/logging_command_executor.py
@@ -57,7 +57,7 @@ class LoggingCommandExecutor(LoggingMixin):
self.log.info("Stdout: %s", output)
self.log.info("Stderr: %s", err)
raise AirflowException(
- "Retcode {} on {} with stdout: {}, stderr: {}".format(retcode, " ".join(cmd), output, err)
+ f"Retcode {retcode} on {' '.join(cmd)} with stdout: {output}, stderr: {err}"
)
return output
diff --git a/tests/test_utils/mock_operators.py b/tests/test_utils/mock_operators.py
index 534770e..989b984 100644
--- a/tests/test_utils/mock_operators.py
+++ b/tests/test_utils/mock_operators.py
@@ -82,7 +82,7 @@ class CustomBaseIndexOpLink(BaseOperatorLink):
@property
def name(self) -> str:
- return 'BigQuery Console #{index}'.format(index=self.index + 1)
+ return f'BigQuery Console #{self.index + 1}'
def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index fffa2d4..0477c01 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -43,9 +43,7 @@ class TestHelpers(unittest.TestCase):
filename_template = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log"
ts = ti.get_template_context()['ts']
- expected_filename = "{dag_id}/{task_id}/{ts}/{try_number}.log".format(
- dag_id=dag_id, task_id=task_id, ts=ts, try_number=try_number
- )
+ expected_filename = f"{dag_id}/{task_id}/{ts}/{try_number}.log"
rendered_filename = helpers.render_log_filename(ti, try_number, filename_template)
diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py
index 5981eac..1a379d9 100644
--- a/tests/www/api/experimental/test_endpoints.py
+++ b/tests/www/api/experimental/test_endpoints.py
@@ -368,7 +368,7 @@ class TestPoolApiExperimental(TestBase):
clear_db_pools()
self.pools = [Pool.get_default_pool()]
for i in range(self.USER_POOL_COUNT):
- name = 'experimental_%s' % (i + 1)
+ name = f'experimental_{i + 1}'
pool = Pool(
pool=name,
slots=i,
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 5011547..929076b 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -757,7 +757,7 @@ class TestAirflowBaseViews(TestBase):
url = 'dag_details?dag_id=test_tree_view'
resp = self.client.get(url, follow_redirects=True)
params = {'dag_id': 'test_tree_view', 'origin': '/tree?dag_id=test_tree_view'}
- href = "/trigger?{}".format(html.escape(urllib.parse.urlencode(params)))
+ href = f"/trigger?{html.escape(urllib.parse.urlencode(params))}"
self.check_content_in_response(href, resp)
def test_dag_details_trigger_origin_graph_view(self):
@@ -772,7 +772,7 @@ class TestAirflowBaseViews(TestBase):
url = 'dag_details?dag_id=test_graph_view'
resp = self.client.get(url, follow_redirects=True)
params = {'dag_id': 'test_graph_view', 'origin': '/graph?dag_id=test_graph_view'}
- href = "/trigger?{}".format(html.escape(urllib.parse.urlencode(params)))
+ href = f"/trigger?{html.escape(urllib.parse.urlencode(params))}"
self.check_content_in_response(href, resp)
def test_dag_details_subdag(self):
@@ -1177,9 +1177,7 @@ class TestLogView(TestBase):
DAG_ID_REMOVED = 'removed_dag_for_testing_log_view'
TASK_ID = 'task_for_testing_log_view'
DEFAULT_DATE = timezone.datetime(2017, 9, 1)
- ENDPOINT = 'log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}'.format(
- dag_id=DAG_ID, task_id=TASK_ID, execution_date=DEFAULT_DATE
- )
+ ENDPOINT = f'log?dag_id={DAG_ID}&task_id={TASK_ID}&execution_date={DEFAULT_DATE}'
def setUp(self):
# Make sure that the configure_logging is not cached
@@ -1277,7 +1275,7 @@ class TestLogView(TestBase):
for num in range(1, expected_num_logs_visible + 1):
assert f'log-group-{num}' in response.data.decode('utf-8')
assert 'log-group-0' not in response.data.decode('utf-8')
- assert 'log-group-{}'.format(expected_num_logs_visible + 1) not in response.data.decode('utf-8')
+ assert f'log-group-{expected_num_logs_visible + 1}' not in response.data.decode('utf-8')
def test_get_logs_with_metadata_as_download_file(self):
url_template = (
@@ -1540,7 +1538,7 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester:
Should set base date to execution date.
"""
response = self.test.client.get(
- self.endpoint + '&execution_date={}'.format(self.runs[1].execution_date.isoformat()),
+ self.endpoint + f'&execution_date={self.runs[1].execution_date.isoformat()}',
data=dict(username='test', password='test'),
follow_redirects=True,
)
@@ -1563,7 +1561,7 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester:
Should set base date and num runs to submitted values.
"""
response = self.test.client.get(
- self.endpoint + '&base_date={}&num_runs=2'.format(self.runs[1].execution_date.isoformat()),
+ self.endpoint + f'&base_date={self.runs[1].execution_date.isoformat()}&num_runs=2',
data=dict(username='test', password='test'),
follow_redirects=True,
)
@@ -2849,7 +2847,7 @@ class TestExtraLinks(TestBase):
name = 'foo-bar'
def get_link(self, operator, dttm):
- return 'http://www.example.com/{}/{}/{}'.format(operator.task_id, 'foo-bar', dttm)
+ return f"http://www.example.com/{operator.task_id}/foo-bar/{dttm}"
class AirflowLink(BaseOperatorLink):
name = 'airflow'