You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/03 09:32:26 UTC

[airflow] 02/41: Switch to f-strings using flynt. (#13732)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 95889128afec4ebb8a3dfe8a8e13a7264d1a3bd6
Author: Joshua Carp <jm...@gmail.com>
AuthorDate: Sat Jan 23 00:19:38 2021 -0500

    Switch to f-strings using flynt. (#13732)
    
    (cherry picked from commit a9ac2b040b64de1aa5d9c2b9def33334e36a8d22)
---
 .pre-commit-config.yaml                            |  7 ++++
 BREEZE.rst                                         |  2 +-
 STATIC_CODE_CHECKS.rst                             |  2 +
 airflow/api/common/experimental/get_code.py        |  2 +-
 airflow/api/common/experimental/pool.py            |  6 +--
 .../api_connexion/endpoints/connection_endpoint.py |  2 +-
 airflow/cli/commands/dag_command.py                |  2 +-
 airflow/cli/commands/task_command.py               | 11 ++----
 airflow/cli/commands/user_command.py               |  8 ++--
 airflow/cli/commands/variable_command.py           |  2 +-
 airflow/configuration.py                           |  2 +-
 .../example_passing_params_via_test_command.py     |  6 +--
 airflow/example_dags/example_trigger_target_dag.py |  2 +-
 airflow/example_dags/subdags/subdag.py             |  2 +-
 airflow/example_dags/tutorial_taskflow_api_etl.py  |  2 +-
 airflow/hooks/dbapi.py                             |  2 +-
 airflow/kubernetes/refresh_config.py               |  2 +-
 airflow/models/connection.py                       |  6 +--
 airflow/models/dag.py                              |  2 +-
 airflow/models/taskinstance.py                     |  4 +-
 airflow/models/xcom.py                             |  4 +-
 airflow/operators/sql.py                           | 14 +++----
 airflow/providers/amazon/aws/hooks/datasync.py     |  4 +-
 airflow/providers/amazon/aws/hooks/dynamodb.py     |  4 +-
 airflow/providers/amazon/aws/hooks/sagemaker.py    |  4 +-
 .../providers/amazon/aws/log/s3_task_handler.py    |  2 +-
 airflow/providers/amazon/aws/operators/datasync.py |  2 +-
 .../amazon/aws/operators/emr_add_steps.py          |  2 +-
 .../amazon/aws/operators/emr_create_job_flow.py    |  2 +-
 .../amazon/aws/operators/emr_modify_cluster.py     |  2 +-
 .../amazon/aws/operators/emr_terminate_job_flow.py |  2 +-
 .../amazon/aws/operators/sagemaker_endpoint.py     |  2 +-
 .../aws/operators/sagemaker_endpoint_config.py     |  2 +-
 .../amazon/aws/operators/sagemaker_model.py        |  2 +-
 .../amazon/aws/operators/sagemaker_processing.py   |  2 +-
 .../amazon/aws/operators/sagemaker_training.py     |  2 +-
 .../amazon/aws/operators/sagemaker_transform.py    |  2 +-
 .../amazon/aws/operators/sagemaker_tuning.py       |  2 +-
 .../providers/amazon/aws/sensors/sagemaker_base.py |  2 +-
 airflow/providers/apache/druid/hooks/druid.py      |  4 +-
 airflow/providers/apache/hdfs/sensors/hdfs.py      |  2 +-
 airflow/providers/apache/hive/hooks/hive.py        |  8 +---
 .../providers/apache/hive/operators/hive_stats.py  | 24 +++++-------
 airflow/providers/apache/spark/hooks/spark_jdbc.py |  2 +-
 .../providers/apache/spark/hooks/spark_submit.py   |  8 ++--
 .../providers/cncf/kubernetes/hooks/kubernetes.py  |  6 +--
 .../cncf/kubernetes/operators/kubernetes_pod.py    |  3 +-
 .../cncf/kubernetes/sensors/spark_kubernetes.py    |  2 +-
 airflow/providers/databricks/hooks/databricks.py   |  2 +-
 airflow/providers/docker/operators/docker_swarm.py |  2 +-
 airflow/providers/ftp/hooks/ftp.py                 |  4 +-
 airflow/providers/google/cloud/hooks/bigquery.py   |  4 +-
 airflow/providers/google/cloud/hooks/compute.py    | 20 +++-------
 airflow/providers/google/cloud/hooks/dataflow.py   | 12 +++---
 airflow/providers/google/cloud/hooks/functions.py  |  4 +-
 .../google/cloud/hooks/kubernetes_engine.py        |  2 +-
 .../providers/google/cloud/log/gcs_task_handler.py |  4 +-
 .../providers/google/cloud/operators/bigquery.py   |  4 +-
 .../providers/google/cloud/operators/dataproc.py   |  2 +-
 .../jenkins/operators/jenkins_job_trigger.py       |  6 +--
 airflow/providers/jira/hooks/jira.py               |  4 +-
 airflow/providers/jira/operators/jira.py           |  4 +-
 .../azure/operators/azure_container_instances.py   |  2 +-
 .../providers/microsoft/winrm/operators/winrm.py   |  2 +-
 airflow/providers/mysql/hooks/mysql.py             | 21 +++-------
 airflow/providers/opsgenie/hooks/opsgenie_alert.py |  2 +-
 airflow/providers/oracle/hooks/oracle.py           |  2 +-
 airflow/providers/pagerduty/hooks/pagerduty.py     |  2 +-
 airflow/providers/plexus/operators/job.py          |  4 +-
 airflow/providers/postgres/hooks/postgres.py       |  2 +-
 airflow/providers/qubole/hooks/qubole.py           |  4 +-
 airflow/providers/salesforce/hooks/salesforce.py   |  2 +-
 airflow/providers/sftp/operators/sftp.py           |  2 +-
 airflow/providers/ssh/operators/ssh.py             |  2 +-
 airflow/security/kerberos.py                       |  4 +-
 airflow/security/utils.py                          |  2 +-
 airflow/sensors/date_time.py                       |  2 +-
 airflow/sensors/sql.py                             |  2 +-
 airflow/utils/cli.py                               |  2 +-
 airflow/utils/code_utils.py                        |  2 +-
 airflow/utils/log/file_task_handler.py             |  6 +--
 airflow/utils/timezone.py                          |  2 +-
 airflow/www/utils.py                               | 10 ++---
 airflow/www/validators.py                          |  4 +-
 airflow/www/views.py                               | 19 +++------
 breeze-complete                                    |  1 +
 docs/exts/airflow_intersphinx.py                   |  2 +-
 docs/exts/exampleinclude.py                        |  2 +-
 docs/exts/redirects.py                             |  2 +-
 metastore_browser/hive_metastore.py                | 14 +++----
 .../pre_commit_check_provider_yaml_files.py        |  8 ++--
 scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py    |  6 +--
 tests/api/common/experimental/test_pool.py         |  2 +-
 tests/cli/commands/test_connection_command.py      | 14 +++----
 tests/core/test_core.py                            |  2 +-
 tests/dags/test_subdag.py                          |  2 +-
 tests/dags_corrupted/test_impersonation_custom.py  |  2 +-
 tests/executors/test_celery_executor.py            |  2 +-
 tests/hooks/test_dbapi.py                          |  2 +-
 tests/models/test_baseoperator.py                  |  4 +-
 tests/models/test_connection.py                    |  2 +-
 tests/models/test_dag.py                           |  4 +-
 tests/models/test_dagbag.py                        |  6 +--
 tests/models/test_renderedtifields.py              |  2 +-
 .../amazon/aws/hooks/test_batch_waiters.py         |  2 +-
 tests/providers/amazon/aws/hooks/test_s3.py        |  4 +-
 .../apache/hive/operators/test_hive_stats.py       | 14 +++----
 .../apache/hive/transfers/test_mysql_to_hive.py    | 24 +++++-------
 .../providers/apache/spark/hooks/test_spark_sql.py | 10 ++---
 tests/providers/apache/sqoop/hooks/test_sqoop.py   | 45 +++++++++-------------
 .../elasticsearch/log/elasticmock/__init__.py      |  2 +-
 .../providers/google/cloud/hooks/test_cloud_sql.py |  4 +-
 tests/providers/google/cloud/hooks/test_pubsub.py  | 32 +++++++--------
 .../google/cloud/operators/test_dataflow.py        |  2 +-
 .../google/cloud/operators/test_mlengine_utils.py  |  6 +--
 tests/providers/google/cloud/sensors/test_gcs.py   |  2 +-
 .../cloud/transfers/test_bigquery_to_bigquery.py   |  2 +-
 .../google/cloud/transfers/test_gcs_to_gcs.py      |  2 +-
 .../cloud/utils/test_mlengine_operator_utils.py    |  6 +--
 tests/providers/mysql/hooks/test_mysql.py          |  6 +--
 tests/serialization/test_dag_serialization.py      |  4 +-
 tests/test_utils/gcp_system_helpers.py             |  2 +-
 tests/test_utils/logging_command_executor.py       |  2 +-
 tests/test_utils/mock_operators.py                 |  2 +-
 tests/utils/test_helpers.py                        |  4 +-
 tests/www/api/experimental/test_endpoints.py       |  2 +-
 tests/www/test_views.py                            | 16 ++++----
 127 files changed, 280 insertions(+), 361 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index aea91e3..0eb96fd 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -609,4 +609,11 @@ repos:
         entry: "./scripts/ci/pre_commit/pre_commit_in_container_bats_test.sh"
         files: ^tests/bats/in_container/.*.bats$|^scripts/in_container/.*sh
         pass_filenames: false
+      - id: flynt
+        name: Convert to f-strings with flynt
+        entry: flynt
+        language: python
+        language_version: python3
+        additional_dependencies: ['flynt']
+        files: \.py$
         ## ONLY ADD PRE-COMMITS HERE THAT REQUIRE CI IMAGE
diff --git a/BREEZE.rst b/BREEZE.rst
index 131f94c..f4689ba 100644
--- a/BREEZE.rst
+++ b/BREEZE.rst
@@ -2232,7 +2232,7 @@ This is the current syntax for  `./breeze <./breeze>`_:
                  check-executables-have-shebangs check-hooks-apply check-integrations
                  check-merge-conflict check-xml consistent-pylint daysago-import-check
                  debug-statements detect-private-key doctoc dont-use-safe-filter end-of-file-fixer
-                 fix-encoding-pragma flake8 forbid-tabs helm-lint identity
+                 fix-encoding-pragma flake8 flynt forbid-tabs helm-lint identity
                  incorrect-use-of-LoggingMixin insert-license isort json-schema language-matters
                  lint-dockerfile lint-openapi markdownlint mermaid mixed-line-ending mypy mypy-helm
                  no-providers-in-core-examples no-relative-imports pre-commit-descriptions
diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst
index 55674bf..1c28dd9 100644
--- a/STATIC_CODE_CHECKS.rst
+++ b/STATIC_CODE_CHECKS.rst
@@ -102,6 +102,8 @@ require Breeze Docker images to be installed locally:
 ----------------------------------- ---------------------------------------------------------------- ------------
 ``flake8``                            Runs flake8.                                                         *
 ----------------------------------- ---------------------------------------------------------------- ------------
+``flynt``                             Runs flynt.
+----------------------------------- ---------------------------------------------------------------- ------------
 ``forbid-tabs``                       Fails if tabs are used in the project.
 ----------------------------------- ---------------------------------------------------------------- ------------
 ``helm-lint``                         Verifies if helm lint passes for the chart
diff --git a/airflow/api/common/experimental/get_code.py b/airflow/api/common/experimental/get_code.py
index 99f248b..79b0b9f 100644
--- a/airflow/api/common/experimental/get_code.py
+++ b/airflow/api/common/experimental/get_code.py
@@ -32,5 +32,5 @@ def get_code(dag_id: str) -> str:
     try:
         return DagCode.get_code_by_fileloc(dag.fileloc)
     except (OSError, DagCodeNotFound) as exception:
-        error_message = "Error {} while reading Dag id {} Code".format(str(exception), dag_id)
+        error_message = f"Error {str(exception)} while reading Dag id {dag_id} Code"
         raise AirflowException(error_message, exception)
diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py
index 0f1d1c7..30950ea 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -29,7 +29,7 @@ def get_pool(name, session=None):
 
     pool = session.query(Pool).filter_by(pool=name).first()
     if pool is None:
-        raise PoolNotFound("Pool '%s' doesn't exist" % name)
+        raise PoolNotFound(f"Pool '{name}' doesn't exist")
 
     return pool
 
@@ -49,7 +49,7 @@ def create_pool(name, slots, description, session=None):
     try:
         slots = int(slots)
     except ValueError:
-        raise AirflowBadRequest("Bad value for `slots`: %s" % slots)
+        raise AirflowBadRequest(f"Bad value for `slots`: {slots}")
 
     # Get the length of the pool column
     pool_name_length = Pool.pool.property.columns[0].type.length
@@ -81,7 +81,7 @@ def delete_pool(name, session=None):
 
     pool = session.query(Pool).filter_by(pool=name).first()
     if pool is None:
-        raise PoolNotFound("Pool '%s' doesn't exist" % name)
+        raise PoolNotFound(f"Pool '{name}' doesn't exist")
 
     session.delete(pool)
     session.commit()
diff --git a/airflow/api_connexion/endpoints/connection_endpoint.py b/airflow/api_connexion/endpoints/connection_endpoint.py
index ecee686..df3bd41 100644
--- a/airflow/api_connexion/endpoints/connection_endpoint.py
+++ b/airflow/api_connexion/endpoints/connection_endpoint.py
@@ -124,4 +124,4 @@ def post_connection(session):
         session.add(connection)
         session.commit()
         return connection_schema.dump(connection)
-    raise AlreadyExists(detail="Connection already exist. ID: %s" % conn_id)
+    raise AlreadyExists(detail=f"Connection already exist. ID: {conn_id}")
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index 40f8834..9b050ec 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -169,7 +169,7 @@ def set_is_paused(is_paused, args):
 
     dag.set_is_paused(is_paused=is_paused)
 
-    print("Dag: {}, paused: {}".format(args.dag_id, str(is_paused)))
+    print(f"Dag: {args.dag_id}, paused: {is_paused}")
 
 
 def dag_show(args):
diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index c2794d4..fac4c26 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -406,14 +406,11 @@ def task_render(args):
     for attr in task.__class__.template_fields:
         print(
             textwrap.dedent(
-                """\
+                f"""        # ----------------------------------------------------------
+        # property: {attr}
         # ----------------------------------------------------------
-        # property: {}
-        # ----------------------------------------------------------
-        {}
-        """.format(
-                    attr, getattr(task, attr)
-                )
+        {getattr(task, attr)}
+        """
             )
         )
 
diff --git a/airflow/cli/commands/user_command.py b/airflow/cli/commands/user_command.py
index 3fd80bd..1274745 100644
--- a/airflow/cli/commands/user_command.py
+++ b/airflow/cli/commands/user_command.py
@@ -98,7 +98,7 @@ def users_manage_role(args, remove=False):
     appbuilder = cached_app().appbuilder  # pylint: disable=no-member
     user = appbuilder.sm.find_user(username=args.username) or appbuilder.sm.find_user(email=args.email)
     if not user:
-        raise SystemExit('User "{}" does not exist'.format(args.username or args.email))
+        raise SystemExit(f'User "{args.username or args.email}" does not exist')
 
     role = appbuilder.sm.find_role(args.role)
     if not role:
@@ -144,7 +144,7 @@ def users_export(args):
 
     with open(args.export, 'w') as file:
         file.write(json.dumps(users, sort_keys=True, indent=4))
-        print("{} users successfully exported to {}".format(len(users), file.name))
+        print(f"{len(users)} users successfully exported to {file.name}")
 
 
 @cli_utils.action_logging
@@ -191,7 +191,7 @@ def _import_users(users_list):  # pylint: disable=redefined-outer-name
 
         existing_user = appbuilder.sm.find_user(email=user['email'])
         if existing_user:
-            print("Found existing user with email '{}'".format(user['email']))
+            print(f"Found existing user with email '{user['email']}'")
             existing_user.roles = roles
             existing_user.first_name = user['firstname']
             existing_user.last_name = user['lastname']
@@ -206,7 +206,7 @@ def _import_users(users_list):  # pylint: disable=redefined-outer-name
             appbuilder.sm.update_user(existing_user)
             users_updated.append(user['email'])
         else:
-            print("Creating new user with email '{}'".format(user['email']))
+            print(f"Creating new user with email '{user['email']}'")
             appbuilder.sm.add_user(
                 username=user['username'],
                 first_name=user['firstname'],
diff --git a/airflow/cli/commands/variable_command.py b/airflow/cli/commands/variable_command.py
index 55cf94b..526f094 100644
--- a/airflow/cli/commands/variable_command.py
+++ b/airflow/cli/commands/variable_command.py
@@ -92,7 +92,7 @@ def _import_helper(filepath):
             try:
                 Variable.set(k, v, serialize_json=not isinstance(v, str))
             except Exception as e:  # pylint: disable=broad-except
-                print('Variable import failed: {}'.format(repr(e)))
+                print(f'Variable import failed: {repr(e)}')
                 fail_count += 1
             else:
                 suc_count += 1
diff --git a/airflow/configuration.py b/airflow/configuration.py
index ecd2bc6..5b765de 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -536,7 +536,7 @@ class AirflowConfigParser(ConfigParser):  # pylint: disable=too-many-ancestors
         # This is based on the configparser.RawConfigParser.write method code to add support for
         # reading options from environment variables.
         if space_around_delimiters:
-            delimiter = " {} ".format(self._delimiters[0])
+            delimiter = f" {self._delimiters[0]} "
         else:
             delimiter = self._delimiters[0]
         if self._defaults:
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index 8eaadd7..456def2 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -52,7 +52,7 @@ def my_py_command(test_mode, params):
             )
         )
     # Print out the value of "miff", passed in below via the Python Operator
-    print(" 'miff' was passed in via task params = {}".format(params["miff"]))
+    print(f" 'miff' was passed in via task params = {params['miff']}")
     return 1
 
 
@@ -83,8 +83,8 @@ def print_env_vars(test_mode):
     --env-vars '{"foo":"bar"}'`
     """
     if test_mode:
-        print("foo={}".format(os.environ.get('foo')))
-        print("AIRFLOW_TEST_MODE={}".format(os.environ.get('AIRFLOW_TEST_MODE')))
+        print(f"foo={os.environ.get('foo')}")
+        print(f"AIRFLOW_TEST_MODE={os.environ.get('AIRFLOW_TEST_MODE')}")
 
 
 env_var_test_task = PythonOperator(task_id='env_var_test_task', python_callable=print_env_vars, dag=dag)
diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py
index 0355275..f431dc4 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -43,7 +43,7 @@ def run_this_func(**context):
     :param context: The execution context
     :type context: dict
     """
-    print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"]))
+    print(f"Remotely received value of {context['dag_run'].conf['message']} for key=message")
 
 
 run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag)
diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py
index 6a30415..849b294 100644
--- a/airflow/example_dags/subdags/subdag.py
+++ b/airflow/example_dags/subdags/subdag.py
@@ -43,7 +43,7 @@ def subdag(parent_dag_name, child_dag_name, args):
 
     for i in range(5):
         DummyOperator(
-            task_id='{}-task-{}'.format(child_dag_name, i + 1),
+            task_id=f'{child_dag_name}-task-{i + 1}',
             default_args=args,
             dag=dag_subdag,
         )
diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py b/airflow/example_dags/tutorial_taskflow_api_etl.py
index e50ae5f..cfcfbd9 100644
--- a/airflow/example_dags/tutorial_taskflow_api_etl.py
+++ b/airflow/example_dags/tutorial_taskflow_api_etl.py
@@ -91,7 +91,7 @@ def tutorial_taskflow_api_etl():
         instead of saving it to end user review, just prints it out.
         """
 
-        print("Total order value is: %.2f" % total_order_value)
+        print(f"Total order value is: {total_order_value:.2f}")
 
     # [END load]
 
diff --git a/airflow/hooks/dbapi.py b/airflow/hooks/dbapi.py
index 9e340e4..9821643 100644
--- a/airflow/hooks/dbapi.py
+++ b/airflow/hooks/dbapi.py
@@ -248,7 +248,7 @@ class DbApiHook(BaseHook):
             sql = "INSERT INTO "
         else:
             sql = "REPLACE INTO "
-        sql += "{} {} VALUES ({})".format(table, target_fields, ",".join(placeholders))
+        sql += f"{table} {target_fields} VALUES ({','.join(placeholders)})"
         return sql
 
     def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs):
diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py
index 0004cac..023cd32 100644
--- a/airflow/kubernetes/refresh_config.py
+++ b/airflow/kubernetes/refresh_config.py
@@ -61,7 +61,7 @@ class RefreshKubeConfigLoader(KubeConfigLoader):
             if 'token' not in status:
                 logging.error('exec: missing token field in plugin output')
                 return None
-            self.token = "Bearer %s" % status['token']  # pylint: disable=W0201
+            self.token = f"Bearer {status['token']}"  # pylint: disable=W0201
             ts_str = status.get('expirationTimestamp')
             if ts_str:
                 self.api_key_expire_ts = _parse_timestamp(ts_str)
diff --git a/airflow/models/connection.py b/airflow/models/connection.py
index 1159a44..4edd6b7 100644
--- a/airflow/models/connection.py
+++ b/airflow/models/connection.py
@@ -165,7 +165,7 @@ class Connection(Base, LoggingMixin):  # pylint: disable=too-many-instance-attri
 
     def get_uri(self) -> str:
         """Return connection in URI format"""
-        uri = '{}://'.format(str(self.conn_type).lower().replace('_', '-'))
+        uri = f"{str(self.conn_type).lower().replace('_', '-')}://"
 
         authority_block = ''
         if self.login is not None:
@@ -190,12 +190,12 @@ class Connection(Base, LoggingMixin):  # pylint: disable=too-many-instance-attri
                 host_block += f'@:{self.port}'
 
         if self.schema:
-            host_block += '/{}'.format(quote(self.schema, safe=''))
+            host_block += f"/{quote(self.schema, safe='')}"
 
         uri += host_block
 
         if self.extra_dejson:
-            uri += '?{}'.format(urlencode(self.extra_dejson))
+            uri += f'?{urlencode(self.extra_dejson)}'
 
         return uri
 
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 8bb32db..15332f3 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1538,7 +1538,7 @@ class DAG(LoggingMixin):
             dttm = timezone.utcnow()
             pickled = pickle.dumps(self)
             d['pickle_len'] = len(pickled)
-            d['pickling_duration'] = "{}".format(timezone.utcnow() - dttm)
+            d['pickling_duration'] = str(timezone.utcnow() - dttm)
         except Exception as e:
             self.log.debug(e)
             d['is_picklable'] = False
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index d671a01..3c9f53a 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1595,9 +1595,7 @@ class TaskInstance(Base, LoggingMixin):  # pylint: disable=R0902,R0904
         yesterday_ds_nodash = yesterday_ds.replace('-', '')
         tomorrow_ds_nodash = tomorrow_ds.replace('-', '')
 
-        ti_key_str = "{dag_id}__{task_id}__{ds_nodash}".format(
-            dag_id=task.dag_id, task_id=task.task_id, ds_nodash=ds_nodash
-        )
+        ti_key_str = f"{task.dag_id}__{task.task_id}__{ds_nodash}"
 
         if task.params:
             params.update(task.params)
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index 6428a40..844e38f 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -71,9 +71,7 @@ class BaseXCom(Base, LoggingMixin):
             self.value = pickle.loads(self.value)
 
     def __repr__(self):
-        return '<XCom "{key}" ({task_id} @ {execution_date})>'.format(
-            key=self.key, task_id=self.task_id, execution_date=self.execution_date
-        )
+        return f'<XCom "{self.key}" ({self.task_id} @ {self.execution_date})>'
 
     @classmethod
     @provide_session
diff --git a/airflow/operators/sql.py b/airflow/operators/sql.py
index 59c2e60..00f7e13 100644
--- a/airflow/operators/sql.py
+++ b/airflow/operators/sql.py
@@ -293,9 +293,7 @@ class SQLIntervalCheckOperator(BaseOperator):
         self.days_back = -abs(days_back)
         self.conn_id = conn_id
         sqlexp = ", ".join(self.metrics_sorted)
-        sqlt = "SELECT {sqlexp} FROM {table} WHERE {date_filter_column}=".format(
-            sqlexp=sqlexp, table=table, date_filter_column=date_filter_column
-        )
+        sqlt = f"SELECT {sqlexp} FROM {table} WHERE {date_filter_column}="
 
         self.sql1 = sqlt + "'{{ ds }}'"
         self.sql2 = sqlt + "'{{ macros.ds_add(ds, " + str(self.days_back) + ") }}'"
@@ -360,9 +358,7 @@ class SQLIntervalCheckOperator(BaseOperator):
                     ratios[k],
                     self.metrics_thresholds[k],
                 )
-            raise AirflowException(
-                "The following tests have failed:\n {}".format(", ".join(sorted(failed_tests)))
-            )
+            raise AirflowException(f"The following tests have failed:\n {', '.join(sorted(failed_tests))}")
 
         self.log.info("All tests have passed")
 
@@ -535,7 +531,7 @@ class BranchSQLOperator(BaseOperator, SkipMixin):
         self._hook = self._get_hook()
 
         if self._hook is None:
-            raise AirflowException("Failed to establish connection to '%s'" % self.conn_id)
+            raise AirflowException(f"Failed to establish connection to '{self.conn_id}'")
 
         if self.sql is None:
             raise AirflowException("Expected 'sql' parameter is missing.")
@@ -584,14 +580,14 @@ class BranchSQLOperator(BaseOperator, SkipMixin):
                     follow_branch = self.follow_task_ids_if_true
             else:
                 raise AirflowException(
-                    "Unexpected query return result '{}' type '{}'".format(query_result, type(query_result))
+                    f"Unexpected query return result '{query_result}' type '{type(query_result)}'"
                 )
 
             if follow_branch is None:
                 follow_branch = self.follow_task_ids_if_false
         except ValueError:
             raise AirflowException(
-                "Unexpected query return result '{}' type '{}'".format(query_result, type(query_result))
+                f"Unexpected query return result '{query_result}' type '{type(query_result)}'"
             )
 
         self.skip_all_except(context["ti"], follow_branch)
diff --git a/airflow/providers/amazon/aws/hooks/datasync.py b/airflow/providers/amazon/aws/hooks/datasync.py
index 4925529..3c82749 100644
--- a/airflow/providers/amazon/aws/hooks/datasync.py
+++ b/airflow/providers/amazon/aws/hooks/datasync.py
@@ -57,7 +57,7 @@ class AWSDataSyncHook(AwsBaseHook):
         self.tasks: list = []
         # wait_interval_seconds = 0 is used during unit tests
         if wait_interval_seconds < 0 or wait_interval_seconds > 15 * 60:
-            raise ValueError("Invalid wait_interval_seconds %s" % wait_interval_seconds)
+            raise ValueError(f"Invalid wait_interval_seconds {wait_interval_seconds}")
         self.wait_interval_seconds = wait_interval_seconds
 
     def create_location(self, location_uri: str, **create_location_kwargs) -> str:
@@ -314,4 +314,4 @@ class AWSDataSyncHook(AwsBaseHook):
             return False
         if iterations <= 0:
             raise AirflowTaskTimeout("Max iterations exceeded!")
-        raise AirflowException("Unknown status: %s" % status)  # Should never happen
+        raise AirflowException(f"Unknown status: {status}")  # Should never happen
diff --git a/airflow/providers/amazon/aws/hooks/dynamodb.py b/airflow/providers/amazon/aws/hooks/dynamodb.py
index a829f8d..a66f2b0 100644
--- a/airflow/providers/amazon/aws/hooks/dynamodb.py
+++ b/airflow/providers/amazon/aws/hooks/dynamodb.py
@@ -58,6 +58,4 @@ class AwsDynamoDBHook(AwsBaseHook):
                     batch.put_item(Item=item)
             return True
         except Exception as general_error:
-            raise AirflowException(
-                "Failed to insert items in dynamodb, error: {error}".format(error=str(general_error))
-            )
+            raise AirflowException(f"Failed to insert items in dynamodb, error: {str(general_error)}")
diff --git a/airflow/providers/amazon/aws/hooks/sagemaker.py b/airflow/providers/amazon/aws/hooks/sagemaker.py
index ab5fdd1..d6548ad 100644
--- a/airflow/providers/amazon/aws/hooks/sagemaker.py
+++ b/airflow/providers/amazon/aws/hooks/sagemaker.py
@@ -126,7 +126,7 @@ def secondary_training_status_message(
     for transition in transitions_to_print:
         message = transition['StatusMessage']
         time_str = timezone.convert_to_utc(job_description['LastModifiedTime']).strftime('%Y-%m-%d %H:%M:%S')
-        status_strs.append('{} {} - {}'.format(time_str, transition['Status'], message))
+        status_strs.append(f"{time_str} {transition['Status']} - {message}")
 
     return '\n'.join(status_strs)
 
@@ -740,7 +740,7 @@ class SageMakerHook(AwsBaseHook):  # pylint: disable=too-many-public-methods
             if status in non_terminal_states:
                 running = True
             elif status in self.failed_states:
-                raise AirflowException('SageMaker job failed because %s' % response['FailureReason'])
+                raise AirflowException(f"SageMaker job failed because {response['FailureReason']}")
             else:
                 running = False
 
diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py
index d6e5326..7fdeac3 100644
--- a/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -118,7 +118,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
             log_exists = self.s3_log_exists(remote_loc)
         except Exception as error:  # pylint: disable=broad-except
             self.log.exception(error)
-            log = '*** Failed to verify remote log exists {}.\n{}\n'.format(remote_loc, str(error))
+            log = f'*** Failed to verify remote log exists {remote_loc}.\n{str(error)}\n'
 
         if log_exists:
             # If S3 remote file exists, we do not fetch logs from task instance
diff --git a/airflow/providers/amazon/aws/operators/datasync.py b/airflow/providers/amazon/aws/operators/datasync.py
index fab6898..f5c129a 100644
--- a/airflow/providers/amazon/aws/operators/datasync.py
+++ b/airflow/providers/amazon/aws/operators/datasync.py
@@ -351,7 +351,7 @@ class AWSDataSyncOperator(BaseOperator):
                     self.log.log(level, '%s=%s', k, v)
 
         if not result:
-            raise AirflowException("Failed TaskExecutionArn %s" % self.task_execution_arn)
+            raise AirflowException(f"Failed TaskExecutionArn {self.task_execution_arn}")
 
     def on_kill(self) -> None:
         """Cancel the submitted DataSync task."""
diff --git a/airflow/providers/amazon/aws/operators/emr_add_steps.py b/airflow/providers/amazon/aws/operators/emr_add_steps.py
index 44bc20c..2ffd5cc 100644
--- a/airflow/providers/amazon/aws/operators/emr_add_steps.py
+++ b/airflow/providers/amazon/aws/operators/emr_add_steps.py
@@ -100,7 +100,7 @@ class EmrAddStepsOperator(BaseOperator):
         response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=steps)
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
-            raise AirflowException('Adding steps failed: %s' % response)
+            raise AirflowException(f'Adding steps failed: {response}')
         else:
             self.log.info('Steps %s added to JobFlow', response['StepIds'])
             return response['StepIds']
diff --git a/airflow/providers/amazon/aws/operators/emr_create_job_flow.py b/airflow/providers/amazon/aws/operators/emr_create_job_flow.py
index b3b6808..d8dc31e 100644
--- a/airflow/providers/amazon/aws/operators/emr_create_job_flow.py
+++ b/airflow/providers/amazon/aws/operators/emr_create_job_flow.py
@@ -78,7 +78,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
         response = emr.create_job_flow(job_flow_overrides)
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
-            raise AirflowException('JobFlow creation failed: %s' % response)
+            raise AirflowException(f'JobFlow creation failed: {response}')
         else:
             self.log.info('JobFlow with id %s created', response['JobFlowId'])
             return response['JobFlowId']
diff --git a/airflow/providers/amazon/aws/operators/emr_modify_cluster.py b/airflow/providers/amazon/aws/operators/emr_modify_cluster.py
index f0e4693..a04e845 100644
--- a/airflow/providers/amazon/aws/operators/emr_modify_cluster.py
+++ b/airflow/providers/amazon/aws/operators/emr_modify_cluster.py
@@ -66,7 +66,7 @@ class EmrModifyClusterOperator(BaseOperator):
         )
 
         if response['ResponseMetadata']['HTTPStatusCode'] != 200:
-            raise AirflowException('Modify cluster failed: %s' % response)
+            raise AirflowException(f'Modify cluster failed: {response}')
         else:
             self.log.info('Steps concurrency level %d', response['StepConcurrencyLevel'])
             return response['StepConcurrencyLevel']
diff --git a/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py b/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py
index 0e7e17f..9d75eaf 100644
--- a/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py
+++ b/airflow/providers/amazon/aws/operators/emr_terminate_job_flow.py
@@ -51,6 +51,6 @@ class EmrTerminateJobFlowOperator(BaseOperator):
         response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
-            raise AirflowException('JobFlow termination failed: %s' % response)
+            raise AirflowException(f'JobFlow termination failed: {response}')
         else:
             self.log.info('JobFlow with id %s terminated', self.job_flow_id)
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py b/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py
index 53cfd93..35b0b11 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_endpoint.py
@@ -150,7 +150,7 @@ class SageMakerEndpointOperator(SageMakerBaseOperator):
             )
 
         if response['ResponseMetadata']['HTTPStatusCode'] != 200:
-            raise AirflowException('Sagemaker endpoint creation failed: %s' % response)
+            raise AirflowException(f'Sagemaker endpoint creation failed: {response}')
         else:
             return {
                 'EndpointConfig': self.hook.describe_endpoint_config(endpoint_info['EndpointConfigName']),
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py b/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py
index bbf2be1..a2add7b 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_endpoint_config.py
@@ -49,6 +49,6 @@ class SageMakerEndpointConfigOperator(SageMakerBaseOperator):
         self.log.info('Creating SageMaker Endpoint Config %s.', self.config['EndpointConfigName'])
         response = self.hook.create_endpoint_config(self.config)
         if response['ResponseMetadata']['HTTPStatusCode'] != 200:
-            raise AirflowException('Sagemaker endpoint config creation failed: %s' % response)
+            raise AirflowException(f'Sagemaker endpoint config creation failed: {response}')
         else:
             return {'EndpointConfig': self.hook.describe_endpoint_config(self.config['EndpointConfigName'])}
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_model.py b/airflow/providers/amazon/aws/operators/sagemaker_model.py
index 25730ea..0e8cbf4 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_model.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_model.py
@@ -53,6 +53,6 @@ class SageMakerModelOperator(SageMakerBaseOperator):
         self.log.info('Creating SageMaker Model %s.', self.config['ModelName'])
         response = self.hook.create_model(self.config)
         if response['ResponseMetadata']['HTTPStatusCode'] != 200:
-            raise AirflowException('Sagemaker model creation failed: %s' % response)
+            raise AirflowException(f'Sagemaker model creation failed: {response}')
         else:
             return {'Model': self.hook.describe_model(self.config['ModelName'])}
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_processing.py b/airflow/providers/amazon/aws/operators/sagemaker_processing.py
index e56a987..271b46b 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_processing.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_processing.py
@@ -119,5 +119,5 @@ class SageMakerProcessingOperator(SageMakerBaseOperator):
             max_ingestion_time=self.max_ingestion_time,
         )
         if response['ResponseMetadata']['HTTPStatusCode'] != 200:
-            raise AirflowException('Sagemaker Processing Job creation failed: %s' % response)
+            raise AirflowException(f'Sagemaker Processing Job creation failed: {response}')
         return {'Processing': self.hook.describe_processing_job(self.config['ProcessingJobName'])}
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_training.py b/airflow/providers/amazon/aws/operators/sagemaker_training.py
index 29c34f6..7d9eaf2 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_training.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_training.py
@@ -117,6 +117,6 @@ class SageMakerTrainingOperator(SageMakerBaseOperator):
             max_ingestion_time=self.max_ingestion_time,
         )
         if response['ResponseMetadata']['HTTPStatusCode'] != 200:
-            raise AirflowException('Sagemaker Training Job creation failed: %s' % response)
+            raise AirflowException(f'Sagemaker Training Job creation failed: {response}')
         else:
             return {'Training': self.hook.describe_training_job(self.config['TrainingJobName'])}
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_transform.py b/airflow/providers/amazon/aws/operators/sagemaker_transform.py
index 7caf9f1..b264d2d 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_transform.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_transform.py
@@ -116,7 +116,7 @@ class SageMakerTransformOperator(SageMakerBaseOperator):
             max_ingestion_time=self.max_ingestion_time,
         )
         if response['ResponseMetadata']['HTTPStatusCode'] != 200:
-            raise AirflowException('Sagemaker transform Job creation failed: %s' % response)
+            raise AirflowException(f'Sagemaker transform Job creation failed: {response}')
         else:
             return {
                 'Model': self.hook.describe_model(transform_config['ModelName']),
diff --git a/airflow/providers/amazon/aws/operators/sagemaker_tuning.py b/airflow/providers/amazon/aws/operators/sagemaker_tuning.py
index f8df36a..38664a8 100644
--- a/airflow/providers/amazon/aws/operators/sagemaker_tuning.py
+++ b/airflow/providers/amazon/aws/operators/sagemaker_tuning.py
@@ -92,6 +92,6 @@ class SageMakerTuningOperator(SageMakerBaseOperator):
             max_ingestion_time=self.max_ingestion_time,
         )
         if response['ResponseMetadata']['HTTPStatusCode'] != 200:
-            raise AirflowException('Sagemaker Tuning Job creation failed: %s' % response)
+            raise AirflowException(f'Sagemaker Tuning Job creation failed: {response}')
         else:
             return {'Tuning': self.hook.describe_tuning_job(self.config['HyperParameterTuningJobName'])}
diff --git a/airflow/providers/amazon/aws/sensors/sagemaker_base.py b/airflow/providers/amazon/aws/sensors/sagemaker_base.py
index 16c8cd7..6572122 100644
--- a/airflow/providers/amazon/aws/sensors/sagemaker_base.py
+++ b/airflow/providers/amazon/aws/sensors/sagemaker_base.py
@@ -63,7 +63,7 @@ class SageMakerBaseSensor(BaseSensorOperator):
 
         if state in self.failed_states():
             failed_reason = self.get_failed_reason_from_response(response)
-            raise AirflowException('Sagemaker job failed for the following reason: %s' % failed_reason)
+            raise AirflowException(f'Sagemaker job failed for the following reason: {failed_reason}')
         return True
 
     def non_terminal_states(self) -> Set[str]:
diff --git a/airflow/providers/apache/druid/hooks/druid.py b/airflow/providers/apache/druid/hooks/druid.py
index b4af207..69d33a1 100644
--- a/airflow/providers/apache/druid/hooks/druid.py
+++ b/airflow/providers/apache/druid/hooks/druid.py
@@ -68,9 +68,7 @@ class DruidHook(BaseHook):
         port = conn.port
         conn_type = 'http' if not conn.conn_type else conn.conn_type
         endpoint = conn.extra_dejson.get('endpoint', '')
-        return "{conn_type}://{host}:{port}/{endpoint}".format(
-            conn_type=conn_type, host=host, port=port, endpoint=endpoint
-        )
+        return f"{conn_type}://{host}:{port}/{endpoint}"
 
     def get_auth(self) -> Optional[requests.auth.HTTPBasicAuth]:
         """
diff --git a/airflow/providers/apache/hdfs/sensors/hdfs.py b/airflow/providers/apache/hdfs/sensors/hdfs.py
index 65cfb5b..867c193 100644
--- a/airflow/providers/apache/hdfs/sensors/hdfs.py
+++ b/airflow/providers/apache/hdfs/sensors/hdfs.py
@@ -141,7 +141,7 @@ class HdfsRegexSensor(HdfsSensor):
         result = [
             f
             for f in sb_client.ls([self.filepath], include_toplevel=False)
-            if f['file_type'] == 'f' and self.regex.match(f['path'].replace('%s/' % self.filepath, ''))
+            if f['file_type'] == 'f' and self.regex.match(f['path'].replace(f'{self.filepath}/', ''))
         ]
         result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
         result = self.filter_for_filesize(result, self.file_size)
diff --git a/airflow/providers/apache/hive/hooks/hive.py b/airflow/providers/apache/hive/hooks/hive.py
index ab7b7b7..41f5cff 100644
--- a/airflow/providers/apache/hive/hooks/hive.py
+++ b/airflow/providers/apache/hive/hooks/hive.py
@@ -133,9 +133,7 @@ class HiveCliHook(BaseHook):
 
         if self.use_beeline:
             hive_bin = 'beeline'
-            jdbc_url = "jdbc:hive2://{host}:{port}/{schema}".format(
-                host=conn.host, port=conn.port, schema=conn.schema
-            )
+            jdbc_url = f"jdbc:hive2://{conn.host}:{conn.port}/{conn.schema}"
             if conf.get('core', 'security') == 'kerberos':
                 template = conn.extra_dejson.get('principal', "hive/_HOST@EXAMPLE.COM")
                 if "_HOST" in template:
@@ -143,9 +141,7 @@ class HiveCliHook(BaseHook):
 
                 proxy_user = self._get_proxy_user()
 
-                jdbc_url += ";principal={template};{proxy_user}".format(
-                    template=template, proxy_user=proxy_user
-                )
+                jdbc_url += f";principal={template};{proxy_user}"
             elif self.auth:
                 jdbc_url += ";auth=" + self.auth
 
diff --git a/airflow/providers/apache/hive/operators/hive_stats.py b/airflow/providers/apache/hive/operators/hive_stats.py
index d4de591..88faa40 100644
--- a/airflow/providers/apache/hive/operators/hive_stats.py
+++ b/airflow/providers/apache/hive/operators/hive_stats.py
@@ -135,9 +135,7 @@ class HiveStatsCollectionOperator(BaseOperator):
 
         where_clause_ = [f"{k} = '{v}'" for k, v in self.partition.items()]
         where_clause = " AND\n        ".join(where_clause_)
-        sql = "SELECT {exprs_str} FROM {table} WHERE {where_clause};".format(
-            exprs_str=exprs_str, table=self.table, where_clause=where_clause
-        )
+        sql = f"SELECT {exprs_str} FROM {self.table} WHERE {where_clause};"
 
         presto = PrestoHook(presto_conn_id=self.presto_conn_id)
         self.log.info('Executing SQL check: %s', sql)
@@ -150,26 +148,22 @@ class HiveStatsCollectionOperator(BaseOperator):
 
         self.log.info("Deleting rows from previous runs if they exist")
         mysql = MySqlHook(self.mysql_conn_id)
-        sql = """
+        sql = f"""
         SELECT 1 FROM hive_stats
         WHERE
-            table_name='{table}' AND
+            table_name='{self.table}' AND
             partition_repr='{part_json}' AND
-            dttm='{dttm}'
+            dttm='{self.dttm}'
         LIMIT 1;
-        """.format(
-            table=self.table, part_json=part_json, dttm=self.dttm
-        )
+        """
         if mysql.get_records(sql):
-            sql = """
+            sql = f"""
             DELETE FROM hive_stats
             WHERE
-                table_name='{table}' AND
+                table_name='{self.table}' AND
                 partition_repr='{part_json}' AND
-                dttm='{dttm}';
-            """.format(
-                table=self.table, part_json=part_json, dttm=self.dttm
-            )
+                dttm='{self.dttm}';
+            """
             mysql.run(sql)
 
         self.log.info("Pivoting and loading cells into the Airflow db")
diff --git a/airflow/providers/apache/spark/hooks/spark_jdbc.py b/airflow/providers/apache/spark/hooks/spark_jdbc.py
index 7a22c66..f90cea2 100644
--- a/airflow/providers/apache/spark/hooks/spark_jdbc.py
+++ b/airflow/providers/apache/spark/hooks/spark_jdbc.py
@@ -207,7 +207,7 @@ class SparkJDBCHook(SparkSubmitHook):
         if self._jdbc_connection['url']:
             arguments += [
                 '-url',
-                "{}{}/{}".format(jdbc_conn['conn_prefix'], jdbc_conn['url'], jdbc_conn['schema']),
+                f"{jdbc_conn['conn_prefix']}{jdbc_conn['url']}/{jdbc_conn['schema']}",
             ]
         if self._jdbc_connection['user']:
             arguments += ['-user', self._jdbc_connection['user']]
diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py
index ac1a83a..e7f2186 100644
--- a/airflow/providers/apache/spark/hooks/spark_submit.py
+++ b/airflow/providers/apache/spark/hooks/spark_submit.py
@@ -291,7 +291,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         connection_cmd += ["--master", self._connection['master']]
 
         for key in self._conf:
-            connection_cmd += ["--conf", "{}={}".format(key, str(self._conf[key]))]
+            connection_cmd += ["--conf", f"{key}={str(self._conf[key])}"]
         if self._env_vars and (self._is_kubernetes or self._is_yarn):
             if self._is_yarn:
                 tmpl = "spark.yarn.appMasterEnv.{}={}"
@@ -308,7 +308,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         if self._is_kubernetes and self._connection['namespace']:
             connection_cmd += [
                 "--conf",
-                "spark.kubernetes.namespace={}".format(self._connection['namespace']),
+                f"spark.kubernetes.namespace={self._connection['namespace']}",
             ]
         if self._files:
             connection_cmd += ["--files", self._files]
@@ -378,9 +378,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
                 "/usr/bin/curl",
                 "--max-time",
                 str(curl_max_wait_time),
-                "{host}/v1/submissions/status/{submission_id}".format(
-                    host=spark_host, submission_id=self._driver_id
-                ),
+                f"{spark_host}/v1/submissions/status/{self._driver_id}",
             ]
             self.log.info(connection_cmd)
 
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index cf27713..ca82918 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -29,7 +29,7 @@ def _load_body_to_dict(body):
     try:
         body_dict = yaml.safe_load(body)
     except yaml.YAMLError as e:
-        raise AirflowException("Exception when loading resource definition: %s\n" % e)
+        raise AirflowException(f"Exception when loading resource definition: {e}\n")
     return body_dict
 
 
@@ -169,7 +169,7 @@ class KubernetesHook(BaseHook):
             self.log.debug("Response: %s", response)
             return response
         except client.rest.ApiException as e:
-            raise AirflowException("Exception when calling -> create_custom_object: %s\n" % e)
+            raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n")
 
     def get_custom_object(
         self, group: str, version: str, plural: str, name: str, namespace: Optional[str] = None
@@ -197,7 +197,7 @@ class KubernetesHook(BaseHook):
             )
             return response
         except client.rest.ApiException as e:
-            raise AirflowException("Exception when calling -> get_custom_object: %s\n" % e)
+            raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n")
 
     def get_namespace(self) -> str:
         """Returns the namespace that defined in the connection"""
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 3f42ab1..7b4022e 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -324,8 +324,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: disable=too-many-instance-
 
             if len(pod_list.items) > 1 and self.reattach_on_restart:
                 raise AirflowException(
-                    'More than one pod running with labels: '
-                    '{label_selector}'.format(label_selector=label_selector)
+                    f'More than one pod running with labels: {label_selector}'
                 )
 
             launcher = pod_launcher.PodLauncher(kube_client=client, extract_xcom=self.do_xcom_push)
diff --git a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
index 2fa3401..eb555f1 100644
--- a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
@@ -106,7 +106,7 @@ class SparkKubernetesSensor(BaseSensorOperator):
         if self.attach_log and application_state in self.FAILURE_STATES + self.SUCCESS_STATES:
             self._log_driver(application_state, response)
         if application_state in self.FAILURE_STATES:
-            raise AirflowException("Spark application failed with state: %s" % application_state)
+            raise AirflowException(f"Spark application failed with state: {application_state}")
         elif application_state in self.SUCCESS_STATES:
             self.log.info("Spark application ended successfully")
             return True
diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py
index cc0fbc6..28953b9 100644
--- a/airflow/providers/databricks/hooks/databricks.py
+++ b/airflow/providers/databricks/hooks/databricks.py
@@ -178,7 +178,7 @@ class DatabricksHook(BaseHook):  # noqa
             auth = (self.databricks_conn.login, self.databricks_conn.password)
             host = self.databricks_conn.host
 
-        url = 'https://{host}/{endpoint}'.format(host=self._parse_host(host), endpoint=endpoint)
+        url = f'https://{self._parse_host(host)}/{endpoint}'
 
         if method == 'GET':
             request_func = requests.get
diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py
index 18ecf9c..1098d98 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -126,7 +126,7 @@ class DockerSwarmOperator(DockerOperator):
                 restart_policy=types.RestartPolicy(condition='none'),
                 resources=types.Resources(mem_limit=self.mem_limit),
             ),
-            name='airflow-%s' % get_random_string(),
+            name=f'airflow-{get_random_string()}',
             labels={'name': f'airflow__{self.dag_id}__{self.task_id}'},
         )
 
diff --git a/airflow/providers/ftp/hooks/ftp.py b/airflow/providers/ftp/hooks/ftp.py
index 66a9080..6c3c064 100644
--- a/airflow/providers/ftp/hooks/ftp.py
+++ b/airflow/providers/ftp/hooks/ftp.py
@@ -180,7 +180,7 @@ class FTPHook(BaseHook):
         remote_path, remote_file_name = os.path.split(remote_full_path)
         conn.cwd(remote_path)
         self.log.info('Retrieving file from FTP: %s', remote_full_path)
-        conn.retrbinary('RETR %s' % remote_file_name, callback)
+        conn.retrbinary(f'RETR {remote_file_name}', callback)
         self.log.info('Finished retrieving file from FTP: %s', remote_full_path)
 
         if is_path and output_handle:
@@ -210,7 +210,7 @@ class FTPHook(BaseHook):
             input_handle = local_full_path_or_buffer
         remote_path, remote_file_name = os.path.split(remote_full_path)
         conn.cwd(remote_path)
-        conn.storbinary('STOR %s' % remote_file_name, input_handle)
+        conn.storbinary(f'STOR {remote_file_name}', input_handle)
 
         if is_path:
             input_handle.close()
diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py
index b7f46d9..0b3c3b3 100644
--- a/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/airflow/providers/google/cloud/hooks/bigquery.py
@@ -2843,7 +2843,7 @@ def _split_tablename(
     cmpt = rest.split('.')
     if len(cmpt) == 3:
         if project_id:
-            raise ValueError("{var}Use either : or . to specify project".format(var=var_print(var_name)))
+            raise ValueError(f"{var_print(var_name)}Use either : or . to specify project")
         project_id = cmpt[0]
         dataset_id = cmpt[1]
         table_id = cmpt[2]
@@ -2887,7 +2887,7 @@ def _cleanse_time_partitioning(
 def _validate_value(key: Any, value: Any, expected_type: Type) -> None:
     """Function to check expected type and raise error if type is not correct"""
     if not isinstance(value, expected_type):
-        raise TypeError("{} argument must have a type {} not {}".format(key, expected_type, type(value)))
+        raise TypeError(f"{key} argument must have a type {expected_type} not {type(value)}")
 
 
 def _api_resource_configs_duplication_check(
diff --git a/airflow/providers/google/cloud/hooks/compute.py b/airflow/providers/google/cloud/hooks/compute.py
index ab84241..c4da00a 100644
--- a/airflow/providers/google/cloud/hooks/compute.py
+++ b/airflow/providers/google/cloud/hooks/compute.py
@@ -99,9 +99,7 @@ class ComputeEngineHook(GoogleBaseHook):
         try:
             operation_name = response["name"]
         except KeyError:
-            raise AirflowException(
-                "Wrong response '{}' returned - it should contain " "'name' field".format(response)
-            )
+            raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")
         self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone)
 
     @GoogleBaseHook.fallback_to_default_project_id
@@ -130,9 +128,7 @@ class ComputeEngineHook(GoogleBaseHook):
         try:
             operation_name = response["name"]
         except KeyError:
-            raise AirflowException(
-                "Wrong response '{}' returned - it should contain " "'name' field".format(response)
-            )
+            raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")
         self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone)
 
     @GoogleBaseHook.fallback_to_default_project_id
@@ -159,9 +155,7 @@ class ComputeEngineHook(GoogleBaseHook):
         try:
             operation_name = response["name"]
         except KeyError:
-            raise AirflowException(
-                "Wrong response '{}' returned - it should contain " "'name' field".format(response)
-            )
+            raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")
         self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone)
 
     def _execute_set_machine_type(self, zone: str, resource_id: str, body: dict, project_id: str) -> dict:
@@ -233,9 +227,7 @@ class ComputeEngineHook(GoogleBaseHook):
         try:
             operation_name = response["name"]
         except KeyError:
-            raise AirflowException(
-                "Wrong response '{}' returned - it should contain " "'name' field".format(response)
-            )
+            raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")
         self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name)
 
     @GoogleBaseHook.fallback_to_default_project_id
@@ -318,9 +310,7 @@ class ComputeEngineHook(GoogleBaseHook):
         try:
             operation_name = response["name"]
         except KeyError:
-            raise AirflowException(
-                "Wrong response '{}' returned - it should contain " "'name' field".format(response)
-            )
+            raise AirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")
         self._wait_for_operation_to_complete(project_id=project_id, operation_name=operation_name, zone=zone)
 
     def _wait_for_operation_to_complete(
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py
index 0a665d4..da3e49c 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -385,21 +385,19 @@ class _DataflowJobsController(LoggingMixin):
         if job['currentState'] == DataflowJobStatus.JOB_STATE_DONE:
             return True
         elif job['currentState'] == DataflowJobStatus.JOB_STATE_FAILED:
-            raise Exception("Google Cloud Dataflow job {} has failed.".format(job['name']))
+            raise Exception(f"Google Cloud Dataflow job {job['name']} has failed.")
         elif job['currentState'] == DataflowJobStatus.JOB_STATE_CANCELLED:
-            raise Exception("Google Cloud Dataflow job {} was cancelled.".format(job['name']))
+            raise Exception(f"Google Cloud Dataflow job {job['name']} was cancelled.")
         elif job['currentState'] == DataflowJobStatus.JOB_STATE_DRAINED:
-            raise Exception("Google Cloud Dataflow job {} was drained.".format(job['name']))
+            raise Exception(f"Google Cloud Dataflow job {job['name']} was drained.")
         elif job['currentState'] == DataflowJobStatus.JOB_STATE_UPDATED:
-            raise Exception("Google Cloud Dataflow job {} was updated.".format(job['name']))
+            raise Exception(f"Google Cloud Dataflow job {job['name']} was updated.")
         elif job['currentState'] == DataflowJobStatus.JOB_STATE_RUNNING and wait_for_running:
             return True
         elif job['currentState'] in DataflowJobStatus.AWAITING_STATES:
             return self._wait_until_finished is False
         self.log.debug("Current job: %s", str(job))
-        raise Exception(
-            "Google Cloud Dataflow job {} was unknown state: {}".format(job["name"], job["currentState"])
-        )
+        raise Exception(f"Google Cloud Dataflow job {job['name']} was unknown state: {job['currentState']}")
 
     def wait_for_done(self) -> None:
         """Helper method to wait for result of submitted job."""
diff --git a/airflow/providers/google/cloud/hooks/functions.py b/airflow/providers/google/cloud/hooks/functions.py
index 8cb93df..49a6e2c 100644
--- a/airflow/providers/google/cloud/hooks/functions.py
+++ b/airflow/providers/google/cloud/hooks/functions.py
@@ -216,9 +216,7 @@ class CloudFunctionsHook(GoogleBaseHook):
         :type project_id: str
         :return: None
         """
-        name = "projects/{project_id}/locations/{location}/functions/{function_id}".format(
-            project_id=project_id, location=location, function_id=function_id
-        )
+        name = f"projects/{project_id}/locations/{location}/functions/{function_id}"
         # fmt: off
         response = self.get_conn().projects().locations().functions().call(  # pylint: disable=no-member
             name=name,
diff --git a/airflow/providers/google/cloud/hooks/kubernetes_engine.py b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
index 930c1cd..604815d 100644
--- a/airflow/providers/google/cloud/hooks/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
@@ -99,7 +99,7 @@ class GKEHook(GoogleBaseHook):
             if operation.status == Operation.Status.RUNNING or operation.status == Operation.Status.PENDING:
                 time.sleep(OPERATIONAL_POLL_INTERVAL)
             else:
-                raise exceptions.GoogleCloudError("Operation has failed with status: %s" % operation.status)
+                raise exceptions.GoogleCloudError(f"Operation has failed with status: {operation.status}")
             # To update status of operation
             operation = self.get_operation(operation.name, project_id=project_id or self.project_id)
         return operation
diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py
index 9fd456d..b57a18a 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -156,7 +156,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n'
             return log, {'end_of_log': True}
         except Exception as e:  # pylint: disable=broad-except
-            log = '*** Unable to read remote log from {}\n*** {}\n\n'.format(remote_loc, str(e))
+            log = f'*** Unable to read remote log from {remote_loc}\n*** {str(e)}\n\n'
             self.log.error(log)
             local_log, metadata = super()._read(ti, try_number)
             log += local_log
@@ -178,7 +178,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             log = '\n'.join([old_log, log]) if old_log else log
         except Exception as e:  # pylint: disable=broad-except
             if not hasattr(e, 'resp') or e.resp.get('status') != '404':  # pylint: disable=no-member
-                log = '*** Previous log discarded: {}\n\n'.format(str(e)) + log
+                log = f'*** Previous log discarded: {str(e)}\n\n' + log
                 self.log.info("Previous log discarded: %s", e)
 
         try:
diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py
index ea2c9d3..28956ec 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -729,9 +729,7 @@ class BigQueryExecuteQueryOperator(BaseOperator):
                 for s in self.sql
             ]
         else:
-            raise AirflowException(
-                "argument 'sql' of type {} is neither a string nor an iterable".format(type(str))
-            )
+            raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable")
         context['task_instance'].xcom_push(key='job_id', value=job_id)
 
     def on_kill(self) -> None:
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index ac93915..3bcb0aa 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1455,7 +1455,7 @@ class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):
     @staticmethod
     def _generate_temp_filename(filename):
         date = time.strftime('%Y%m%d%H%M%S')
-        return "{}_{}_{}".format(date, str(uuid.uuid4())[:8], ntpath.basename(filename))
+        return f"{date}_{str(uuid.uuid4())[:8]}_{ntpath.basename(filename)}"
 
     def _upload_file_temp(self, bucket, local_file):
         """Upload a local file to a Google Cloud Storage bucket."""
diff --git a/airflow/providers/jenkins/operators/jenkins_job_trigger.py b/airflow/providers/jenkins/operators/jenkins_job_trigger.py
index 69d0f63..91b8ca9 100644
--- a/airflow/providers/jenkins/operators/jenkins_job_trigger.py
+++ b/airflow/providers/jenkins/operators/jenkins_job_trigger.py
@@ -54,7 +54,7 @@ def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> Optio
         response_headers = response.headers
         if response_body is None:
             raise jenkins.EmptyResponseException(
-                "Error communicating with server[%s]: empty response" % jenkins_server.server
+                f"Error communicating with server[{jenkins_server.server}]: empty response"
             )
         return {'body': response_body.decode('utf-8'), 'headers': response_headers}
     except HTTPError as e:
@@ -66,9 +66,9 @@ def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> Optio
         else:
             raise
     except socket.timeout as e:
-        raise jenkins.TimeoutException('Error in request: %s' % e)
+        raise jenkins.TimeoutException(f'Error in request: {e}')
     except URLError as e:
-        raise JenkinsException('Error in request: %s' % e.reason)
+        raise JenkinsException(f'Error in request: {e.reason}')
     return None
 
 
diff --git a/airflow/providers/jira/hooks/jira.py b/airflow/providers/jira/hooks/jira.py
index 8951c12..5d186b8 100644
--- a/airflow/providers/jira/hooks/jira.py
+++ b/airflow/providers/jira/hooks/jira.py
@@ -82,8 +82,8 @@ class JiraHook(BaseHook):
                     proxies=self.proxies,
                 )
             except JIRAError as jira_error:
-                raise AirflowException('Failed to create jira client, jira error: %s' % str(jira_error))
+                raise AirflowException(f'Failed to create jira client, jira error: {str(jira_error)}')
             except Exception as e:
-                raise AirflowException('Failed to create jira client, error: %s' % str(e))
+                raise AirflowException(f'Failed to create jira client, error: {str(e)}')
 
         return self.client
diff --git a/airflow/providers/jira/operators/jira.py b/airflow/providers/jira/operators/jira.py
index 3550d1f..1e9530c 100644
--- a/airflow/providers/jira/operators/jira.py
+++ b/airflow/providers/jira/operators/jira.py
@@ -89,6 +89,6 @@ class JiraOperator(BaseOperator):
             return jira_result
 
         except JIRAError as jira_error:
-            raise AirflowException("Failed to execute jiraOperator, error: %s" % str(jira_error))
+            raise AirflowException(f"Failed to execute jiraOperator, error: {str(jira_error)}")
         except Exception as e:
-            raise AirflowException("Jira operator error: %s" % str(e))
+            raise AirflowException(f"Jira operator error: {str(e)}")
diff --git a/airflow/providers/microsoft/azure/operators/azure_container_instances.py b/airflow/providers/microsoft/azure/operators/azure_container_instances.py
index 74418ac..70355ed 100644
--- a/airflow/providers/microsoft/azure/operators/azure_container_instances.py
+++ b/airflow/providers/microsoft/azure/operators/azure_container_instances.py
@@ -278,7 +278,7 @@ class AzureContainerInstancesOperator(BaseOperator):
 
             self.log.info("Container had exit code: %s", exit_code)
             if exit_code != 0:
-                raise AirflowException("Container had a non-zero exit code, %s" % exit_code)
+                raise AirflowException(f"Container had a non-zero exit code, {exit_code}")
             return exit_code
 
         except CloudError:
diff --git a/airflow/providers/microsoft/winrm/operators/winrm.py b/airflow/providers/microsoft/winrm/operators/winrm.py
index 5500e3d..55d65cf 100644
--- a/airflow/providers/microsoft/winrm/operators/winrm.py
+++ b/airflow/providers/microsoft/winrm/operators/winrm.py
@@ -129,7 +129,7 @@ class WinRMOperator(BaseOperator):
             self.winrm_hook.winrm_protocol.close_shell(winrm_client)  # type: ignore[attr-defined]
 
         except Exception as e:
-            raise AirflowException("WinRM operator error: {}".format(str(e)))
+            raise AirflowException(f"WinRM operator error: {str(e)}")
 
         if return_code == 0:
             # returning output if do_xcom_push is set
diff --git a/airflow/providers/mysql/hooks/mysql.py b/airflow/providers/mysql/hooks/mysql.py
index 5da0be2..7d1f71f 100644
--- a/airflow/providers/mysql/hooks/mysql.py
+++ b/airflow/providers/mysql/hooks/mysql.py
@@ -164,12 +164,10 @@ class MySqlHook(DbApiHook):
         conn = self.get_conn()
         cur = conn.cursor()
         cur.execute(
-            """
+            f"""
             LOAD DATA LOCAL INFILE '{tmp_file}'
             INTO TABLE {table}
-            """.format(
-                tmp_file=tmp_file, table=table
-            )
+            """
         )
         conn.commit()
 
@@ -178,12 +176,10 @@ class MySqlHook(DbApiHook):
         conn = self.get_conn()
         cur = conn.cursor()
         cur.execute(
-            """
+            f"""
             SELECT * INTO OUTFILE '{tmp_file}'
             FROM {table}
-            """.format(
-                tmp_file=tmp_file, table=table
-            )
+            """
         )
         conn.commit()
 
@@ -251,17 +247,12 @@ class MySqlHook(DbApiHook):
         cursor = conn.cursor()
 
         cursor.execute(
-            """
+            f"""
             LOAD DATA LOCAL INFILE '{tmp_file}'
             {duplicate_key_handling}
             INTO TABLE {table}
             {extra_options}
-            """.format(
-                tmp_file=tmp_file,
-                table=table,
-                duplicate_key_handling=duplicate_key_handling,
-                extra_options=extra_options,
-            )
+            """
         )
 
         cursor.close()
diff --git a/airflow/providers/opsgenie/hooks/opsgenie_alert.py b/airflow/providers/opsgenie/hooks/opsgenie_alert.py
index aad834c..60f1734 100644
--- a/airflow/providers/opsgenie/hooks/opsgenie_alert.py
+++ b/airflow/providers/opsgenie/hooks/opsgenie_alert.py
@@ -82,5 +82,5 @@ class OpsgenieAlertHook(HttpHook):
         return self.run(
             endpoint='v2/alerts',
             data=json.dumps(payload),
-            headers={'Content-Type': 'application/json', 'Authorization': 'GenieKey %s' % api_key},
+            headers={'Content-Type': 'application/json', 'Authorization': f'GenieKey {api_key}'},
         )
diff --git a/airflow/providers/oracle/hooks/oracle.py b/airflow/providers/oracle/hooks/oracle.py
index bfb40a4..1c5c536 100644
--- a/airflow/providers/oracle/hooks/oracle.py
+++ b/airflow/providers/oracle/hooks/oracle.py
@@ -178,7 +178,7 @@ class OracleHook(DbApiHook):
                 else:
                     lst.append(str(cell))
             values = tuple(lst)
-            sql = 'INSERT /*+ APPEND */ INTO {} {} VALUES ({})'.format(table, target_fields, ','.join(values))
+            sql = f"INSERT /*+ APPEND */ INTO {table} {target_fields} VALUES ({','.join(values)})"
             cur.execute(sql)
             if i % commit_every == 0:
                 conn.commit()  # type: ignore[attr-defined]
diff --git a/airflow/providers/pagerduty/hooks/pagerduty.py b/airflow/providers/pagerduty/hooks/pagerduty.py
index 4de43df..e42357a 100644
--- a/airflow/providers/pagerduty/hooks/pagerduty.py
+++ b/airflow/providers/pagerduty/hooks/pagerduty.py
@@ -145,7 +145,7 @@ class PagerdutyHook(BaseHook):
 
         actions = ('trigger', 'acknowledge', 'resolve')
         if action not in actions:
-            raise ValueError("Event action must be one of: %s" % ', '.join(actions))
+            raise ValueError(f"Event action must be one of: {', '.join(actions)}")
         data = {
             "event_action": action,
             "payload": payload,
diff --git a/airflow/providers/plexus/operators/job.py b/airflow/providers/plexus/operators/job.py
index ece5df6..8f56987 100644
--- a/airflow/providers/plexus/operators/job.py
+++ b/airflow/providers/plexus/operators/job.py
@@ -149,9 +149,7 @@ class PlexusJobOperator(BaseOperator):
         """
         missing_params = self.required_params - set(self.job_params)
         if len(missing_params) > 0:
-            raise AirflowException(
-                "Missing the following required job_params: {}".format(", ".join(missing_params))
-            )
+            raise AirflowException(f"Missing the following required job_params: {', '.join(missing_params)}")
         params = {}
         for prm in self.job_params:
             if prm in self.lookups:
diff --git a/airflow/providers/postgres/hooks/postgres.py b/airflow/providers/postgres/hooks/postgres.py
index 0f38823..cd80bd9 100644
--- a/airflow/providers/postgres/hooks/postgres.py
+++ b/airflow/providers/postgres/hooks/postgres.py
@@ -223,7 +223,7 @@ class PostgresHook(DbApiHook):
         else:
             target_fields_fragment = ''
 
-        sql = "INSERT INTO {} {} VALUES ({})".format(table, target_fields_fragment, ",".join(placeholders))
+        sql = f"INSERT INTO {table} {target_fields_fragment} VALUES ({','.join(placeholders)})"
 
         if replace:
             if target_fields is None:
diff --git a/airflow/providers/qubole/hooks/qubole.py b/airflow/providers/qubole/hooks/qubole.py
index 0cc3832..6655de5 100644
--- a/airflow/providers/qubole/hooks/qubole.py
+++ b/airflow/providers/qubole/hooks/qubole.py
@@ -262,7 +262,7 @@ class QuboleHook(BaseHook):
         for key, value in self.kwargs.items():  # pylint: disable=too-many-nested-blocks
             if key in COMMAND_ARGS[cmd_type]:
                 if key in HYPHEN_ARGS:
-                    args.append("--{}={}".format(key.replace('_', '-'), value))
+                    args.append(f"--{key.replace('_', '-')}={value}")
                 elif key in positional_args_list:
                     inplace_args = value
                 elif key == 'tags':
@@ -273,7 +273,7 @@ class QuboleHook(BaseHook):
                 else:
                     args.append(f"--{key}={value}")
 
-        args.append("--tags={}".format(','.join(filter(None, tags))))
+        args.append(f"--tags={','.join(filter(None, tags))}")
 
         if inplace_args is not None:
             args += inplace_args.split(' ')
diff --git a/airflow/providers/salesforce/hooks/salesforce.py b/airflow/providers/salesforce/hooks/salesforce.py
index aeaef5d..d76baac 100644
--- a/airflow/providers/salesforce/hooks/salesforce.py
+++ b/airflow/providers/salesforce/hooks/salesforce.py
@@ -146,7 +146,7 @@ class SalesforceHook(BaseHook):
         :return: all instances of the object from Salesforce.
         :rtype: dict
         """
-        query = "SELECT {} FROM {}".format(",".join(fields), obj)
+        query = f"SELECT {','.join(fields)} FROM {obj}"
 
         self.log.info(
             "Making query to Salesforce: %s",
diff --git a/airflow/providers/sftp/operators/sftp.py b/airflow/providers/sftp/operators/sftp.py
index 39cc14b..f137352 100644
--- a/airflow/providers/sftp/operators/sftp.py
+++ b/airflow/providers/sftp/operators/sftp.py
@@ -154,7 +154,7 @@ class SFTPOperator(BaseOperator):
                     sftp_client.put(self.local_filepath, self.remote_filepath, confirm=self.confirm)
 
         except Exception as e:
-            raise AirflowException("Error while transferring {}, error: {}".format(file_msg, str(e)))
+            raise AirflowException(f"Error while transferring {file_msg}, error: {str(e)}")
 
         return self.local_filepath
 
diff --git a/airflow/providers/ssh/operators/ssh.py b/airflow/providers/ssh/operators/ssh.py
index 14cec96..32ac1f2 100644
--- a/airflow/providers/ssh/operators/ssh.py
+++ b/airflow/providers/ssh/operators/ssh.py
@@ -168,7 +168,7 @@ class SSHOperator(BaseOperator):
                     raise AirflowException(f"error running cmd: {self.command}, error: {error_msg}")
 
         except Exception as e:
-            raise AirflowException("SSH operator error: {}".format(str(e)))
+            raise AirflowException(f"SSH operator error: {str(e)}")
 
         return True
 
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index 87c269d..f1ddfdf 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -56,7 +56,7 @@ def renew_from_kt(principal: str, keytab: str, exit_on_fail: bool = True):
     """
     # The config is specified in seconds. But we ask for that same amount in
     # minutes to give ourselves a large renewal buffer.
-    renewal_lifetime = "%sm" % conf.getint('kerberos', 'reinit_frequency')
+    renewal_lifetime = f"{conf.getint('kerberos', 'reinit_frequency')}m"
 
     cmd_principal = principal or conf.get('kerberos', 'principal').replace("_HOST", socket.getfqdn())
 
@@ -128,7 +128,7 @@ def perform_krb181_workaround(principal: str):
     ret = subprocess.call(cmdv, close_fds=True)
 
     if ret != 0:
-        principal = "{}/{}".format(principal or conf.get('kerberos', 'principal'), socket.getfqdn())
+        principal = f"{principal or conf.get('kerberos', 'principal')}/{socket.getfqdn()}"
         princ = principal
         ccache = conf.get('kerberos', 'principal')
         log.error(
diff --git a/airflow/security/utils.py b/airflow/security/utils.py
index e5ceadb..ca203b5 100644
--- a/airflow/security/utils.py
+++ b/airflow/security/utils.py
@@ -55,7 +55,7 @@ def replace_hostname_pattern(components, host=None):
     fqdn = host
     if not fqdn or fqdn == '0.0.0.0':
         fqdn = get_hostname()
-    return '{}/{}@{}'.format(components[0], fqdn.lower(), components[2])
+    return f'{components[0]}/{fqdn.lower()}@{components[2]}'
 
 
 def get_fqdn(hostname_or_ip=None):
diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py
index 9e997bc..4a6f11b 100644
--- a/airflow/sensors/date_time.py
+++ b/airflow/sensors/date_time.py
@@ -65,7 +65,7 @@ class DateTimeSensor(BaseSensorOperator):
             self.target_time = target_time
         else:
             raise TypeError(
-                "Expected str or datetime.datetime type for target_time. Got {}".format(type(target_time))
+                f"Expected str or datetime.datetime type for target_time. Got {type(target_time)}"
             )
 
     def poke(self, context: Dict) -> bool:
diff --git a/airflow/sensors/sql.py b/airflow/sensors/sql.py
index 573c7cd..923af6c 100644
--- a/airflow/sensors/sql.py
+++ b/airflow/sensors/sql.py
@@ -89,7 +89,7 @@ class SqlSensor(BaseSensorOperator):
         if conn.conn_type not in allowed_conn_type:
             raise AirflowException(
                 "The connection type is not supported by SqlSensor. "
-                + "Supported connection types: {}".format(list(allowed_conn_type))
+                + f"Supported connection types: {list(allowed_conn_type)}"
             )
         return conn.get_hook()
 
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
index 68a0b44..600a693 100644
--- a/airflow/utils/cli.py
+++ b/airflow/utils/cli.py
@@ -265,7 +265,7 @@ def sigquit_handler(sig, frame):  # pylint: disable=unused-argument
     id_to_name = {th.ident: th.name for th in threading.enumerate()}
     code = []
     for thread_id, stack in sys._current_frames().items():  # pylint: disable=protected-access
-        code.append("\n# Thread: {}({})".format(id_to_name.get(thread_id, ""), thread_id))
+        code.append(f"\n# Thread: {id_to_name.get(thread_id, '')}({thread_id})")
         for filename, line_number, name, line in traceback.extract_stack(stack):
             code.append(f'File: "{filename}", line {line_number}, in {name}')
             if line:
diff --git a/airflow/utils/code_utils.py b/airflow/utils/code_utils.py
index 77cfa42..53b2db1 100644
--- a/airflow/utils/code_utils.py
+++ b/airflow/utils/code_utils.py
@@ -50,7 +50,7 @@ def get_python_source(x: Any) -> Optional[str]:
             pass
 
     if source_code is None:
-        source_code = 'No source code available for {}'.format(type(x))
+        source_code = f'No source code available for {type(x)}'
     return source_code
 
 
diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py
index 6f04cbf..7617bda 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -120,7 +120,7 @@ class FileTaskHandler(logging.Handler):
                     log += "".join(file.readlines())
             except Exception as e:  # pylint: disable=broad-except
                 log = f"*** Failed to load local log file: {location}\n"
-                log += "*** {}\n".format(str(e))
+                log += f"*** {str(e)}\n"
         elif conf.get('core', 'executor') == 'KubernetesExecutor':  # pylint: disable=too-many-nested-blocks
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
@@ -158,7 +158,7 @@ class FileTaskHandler(logging.Handler):
                     log += line.decode()
 
             except Exception as f:  # pylint: disable=broad-except
-                log += '*** Unable to fetch logs from worker pod {} ***\n{}\n\n'.format(ti.hostname, str(f))
+                log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
         else:
             url = os.path.join("http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path).format(
                 ti=ti, worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT')
@@ -180,7 +180,7 @@ class FileTaskHandler(logging.Handler):
 
                 log += '\n' + response.text
             except Exception as e:  # pylint: disable=broad-except
-                log += "*** Failed to fetch log file from worker. {}\n".format(str(e))
+                log += f"*** Failed to fetch log file from worker. {str(e)}\n"
 
         return log, {'end_of_log': True}
 
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index d302cbe..2b2521a 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -109,7 +109,7 @@ def make_aware(value, timezone=None):
 
     # Check that we won't overwrite the timezone of an aware datetime.
     if is_localized(value):
-        raise ValueError("make_aware expects a naive datetime, got %s" % value)
+        raise ValueError(f"make_aware expects a naive datetime, got {value}")
     if hasattr(value, 'fold'):
         # In case of python 3.6 we want to do the same that pendulum does for python3.5
         # i.e in case we move clock back we want to schedule the run at the time of the second
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 265a12f..09ebe9a 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -128,14 +128,14 @@ def generate_pages(current_page, num_of_pages, search=None, status=None, window=
     is_disabled = 'disabled' if current_page <= 0 else ''
     output.append(
         first_node.format(
-            href_link="?{}".format(get_params(page=0, search=search, status=status)),  # noqa
+            href_link=f"?{get_params(page=0, search=search, status=status)}",  # noqa
             disabled=is_disabled,
         )
     )
 
     page_link = void_link
     if current_page > 0:
-        page_link = '?{}'.format(get_params(page=(current_page - 1), search=search, status=status))
+        page_link = f'?{get_params(page=current_page - 1, search=search, status=status)}'
 
     output.append(previous_node.format(href_link=page_link, disabled=is_disabled))  # noqa
 
@@ -157,7 +157,7 @@ def generate_pages(current_page, num_of_pages, search=None, status=None, window=
             'is_active': 'active' if is_current(current_page, page) else '',
             'href_link': void_link
             if is_current(current_page, page)
-            else '?{}'.format(get_params(page=page, search=search, status=status)),
+            else f'?{get_params(page=page, search=search, status=status)}',
             'page_num': page + 1,
         }
         output.append(page_node.format(**vals))  # noqa
@@ -167,13 +167,13 @@ def generate_pages(current_page, num_of_pages, search=None, status=None, window=
     page_link = (
         void_link
         if current_page >= num_of_pages - 1
-        else '?{}'.format(get_params(page=current_page + 1, search=search, status=status))
+        else f'?{get_params(page=current_page + 1, search=search, status=status)}'
     )
 
     output.append(next_node.format(href_link=page_link, disabled=is_disabled))  # noqa
     output.append(
         last_node.format(
-            href_link="?{}".format(get_params(page=last_page, search=search, status=status)),  # noqa
+            href_link=f"?{get_params(page=last_page, search=search, status=status)}",  # noqa
             disabled=is_disabled,
         )
     )
diff --git a/airflow/www/validators.py b/airflow/www/validators.py
index 9699a47..fe35dcb 100644
--- a/airflow/www/validators.py
+++ b/airflow/www/validators.py
@@ -37,7 +37,7 @@ class GreaterEqualThan(EqualTo):
         try:
             other = form[self.fieldname]
         except KeyError:
-            raise ValidationError(field.gettext("Invalid field name '%s'." % self.fieldname))
+            raise ValidationError(field.gettext(f"Invalid field name '{self.fieldname}'."))
 
         if field.data is None or other.data is None:
             return
@@ -50,7 +50,7 @@ class GreaterEqualThan(EqualTo):
             message = self.message
             if message is None:
                 message = field.gettext(
-                    'Field must be greater than or equal to %(other_label)s.' % message_args
+                    f"Field must be greater than or equal to {message_args['other_label']}."
                 )
             else:
                 message = message % message_args
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 1c00951..93ffc38 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1639,7 +1639,7 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: disable=too-many-public-m
         new_dag_state = set_dag_run_state_to_failed(dag, execution_date, commit=confirmed)
 
         if confirmed:
-            flash('Marked failed on {} task instances'.format(len(new_dag_state)))
+            flash(f'Marked failed on {len(new_dag_state)} task instances')
             return redirect(origin)
 
         else:
@@ -1668,7 +1668,7 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: disable=too-many-public-m
         new_dag_state = set_dag_run_state_to_success(dag, execution_date, commit=confirmed)
 
         if confirmed:
-            flash('Marked success on {} task instances'.format(len(new_dag_state)))
+            flash(f'Marked success on {len(new_dag_state)} task instances')
             return redirect(origin)
 
         else:
@@ -1752,7 +1752,7 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: disable=too-many-public-m
                 commit=True,
             )
 
-            flash("Marked {} on {} task instances".format(state, len(altered)))
+            flash(f"Marked {state} on {len(altered)} task instances")
             return redirect(origin)
 
         to_be_altered = set_state(
@@ -3410,10 +3410,7 @@ class DagRunModelView(AirflowModelView):
                 cleared_ti_count += len(tis)
                 models.clear_task_instances(tis, session, dag=dag)
 
-            flash(
-                "{count} dag runs and {altered_ti_count} task instances "
-                "were cleared".format(count=count, altered_ti_count=cleared_ti_count)
-            )
+            flash(f"{count} dag runs and {cleared_ti_count} task instances were cleared")
         except Exception:  # noqa pylint: disable=broad-except
             flash('Failed to clear state', 'error')
         return redirect(self.get_default_url())
@@ -3636,7 +3633,7 @@ class TaskInstanceModelView(AirflowModelView):
                 models.clear_task_instances(task_instances_list, session, dag=dag)
 
             session.commit()
-            flash("{} task instances have been cleared".format(len(task_instances)))
+            flash(f"{len(task_instances)} task instances have been cleared")
             self.update_redirect()
             return redirect(self.get_redirect())
         except Exception as e:  # noqa pylint: disable=broad-except
@@ -3651,11 +3648,7 @@ class TaskInstanceModelView(AirflowModelView):
             for ti in tis:
                 ti.set_state(target_state, session)
             session.commit()
-            flash(
-                "{count} task instances were set to '{target_state}'".format(
-                    count=count, target_state=target_state
-                )
-            )
+            flash(f"{count} task instances were set to '{target_state}'")
         except Exception:  # noqa pylint: disable=broad-except
             flash('Failed to set state', 'error')
 
diff --git a/breeze-complete b/breeze-complete
index 63b34ad..5562cee 100644
--- a/breeze-complete
+++ b/breeze-complete
@@ -97,6 +97,7 @@ dont-use-safe-filter
 end-of-file-fixer
 fix-encoding-pragma
 flake8
+flynt
 forbid-tabs
 helm-lint
 identity
diff --git a/docs/exts/airflow_intersphinx.py b/docs/exts/airflow_intersphinx.py
index ee83b8f..a3bd262 100644
--- a/docs/exts/airflow_intersphinx.py
+++ b/docs/exts/airflow_intersphinx.py
@@ -150,7 +150,7 @@ if __name__ == "__main__":
             except ValueError as exc:
                 print(exc.args[0] % exc.args[1:])
             except Exception as exc:  # pylint: disable=broad-except
-                print('Unknown error: %r' % exc)
+                print(f'Unknown error: {exc!r}')
 
         provider_mapping = _generate_provider_intersphinx_mapping()
 
diff --git a/docs/exts/exampleinclude.py b/docs/exts/exampleinclude.py
index 141ca82..815eb21 100644
--- a/docs/exts/exampleinclude.py
+++ b/docs/exts/exampleinclude.py
@@ -140,7 +140,7 @@ def register_source(app, env, modname):
     """
     entry = env._viewcode_modules.get(modname, None)
     if entry is False:
-        print("[%s] Entry is false for " % modname)
+        print(f"[{modname}] Entry is false for ")
         return False
 
     code_tags = app.emit_firstresult("viewcode-find-source", modname)
diff --git a/docs/exts/redirects.py b/docs/exts/redirects.py
index 7874124..20603c4 100644
--- a/docs/exts/redirects.py
+++ b/docs/exts/redirects.py
@@ -54,7 +54,7 @@ def generate_redirects(app):
             from_path = from_path.replace(in_suffix, '.html')
             to_path = to_path.replace(in_suffix, ".html")
 
-            to_path_prefix = "..%s" % os.path.sep * (len(from_path.split(os.path.sep)) - 1)
+            to_path_prefix = f"..{os.path.sep}" * (len(from_path.split(os.path.sep)) - 1)
             to_path = to_path_prefix + to_path
 
             log.debug("Resolved redirect '%s' to '%s'", from_path, to_path)
diff --git a/metastore_browser/hive_metastore.py b/metastore_browser/hive_metastore.py
index 58d4da3..462f245 100644
--- a/metastore_browser/hive_metastore.py
+++ b/metastore_browser/hive_metastore.py
@@ -95,7 +95,7 @@ class MetastoreBrowserView(BaseView):
     def partitions(self):
         """Retrieve table partitions"""
         schema, table = request.args.get("table").split('.')
-        sql = """
+        sql = f"""
         SELECT
             a.PART_NAME,
             a.CREATE_TIME,
@@ -111,9 +111,7 @@ class MetastoreBrowserView(BaseView):
             b.TBL_NAME like '{table}' AND
             d.NAME like '{schema}'
         ORDER BY PART_NAME DESC
-        """.format(
-            table=table, schema=schema
-        )
+        """
         hook = MySqlHook(METASTORE_MYSQL_CONN_ID)
         df = hook.get_pandas_df(sql)
         return df.to_html(
@@ -133,7 +131,7 @@ class MetastoreBrowserView(BaseView):
         if DB_DENY_LIST:
             dbs = ",".join(["'" + db + "'" for db in DB_DENY_LIST])
             where_clause = f"AND b.name NOT IN ({dbs})"
-        sql = """
+        sql = f"""
         SELECT CONCAT(b.NAME, '.', a.TBL_NAME), TBL_TYPE
         FROM TBLS a
         JOIN DBS b ON a.DB_ID = b.DB_ID
@@ -143,10 +141,8 @@ class MetastoreBrowserView(BaseView):
             b.NAME NOT LIKE '%tmp%' AND
             b.NAME NOT LIKE '%temp%'
         {where_clause}
-        LIMIT {LIMIT};
-        """.format(
-            where_clause=where_clause, LIMIT=TABLE_SELECTOR_LIMIT
-        )
+        LIMIT {TABLE_SELECTOR_LIMIT};
+        """
         hook = MySqlHook(METASTORE_MYSQL_CONN_ID)
         data = [{'id': row[0], 'text': row[0]} for row in hook.get_records(sql)]
         return json.dumps(data)
diff --git a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
index 038e027..688f767 100755
--- a/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
+++ b/scripts/ci/pre_commit/pre_commit_check_provider_yaml_files.py
@@ -98,16 +98,16 @@ def assert_sets_equal(set1, set2):
     try:
         difference1 = set1.difference(set2)
     except TypeError as e:
-        raise AssertionError('invalid type when attempting set difference: %s' % e)
+        raise AssertionError(f'invalid type when attempting set difference: {e}')
     except AttributeError as e:
-        raise AssertionError('first argument does not support set difference: %s' % e)
+        raise AssertionError(f'first argument does not support set difference: {e}')
 
     try:
         difference2 = set2.difference(set1)
     except TypeError as e:
-        raise AssertionError('invalid type when attempting set difference: %s' % e)
+        raise AssertionError(f'invalid type when attempting set difference: {e}')
     except AttributeError as e:
-        raise AssertionError('second argument does not support set difference: %s' % e)
+        raise AssertionError(f'second argument does not support set difference: {e}')
 
     if not (difference1 or difference2):
         return
diff --git a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
index 6558901..0480286 100755
--- a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
+++ b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
@@ -121,7 +121,7 @@ def _write_option(configfile, idx, option):
     if option["example"]:
         if not str(option["name"]).endswith("_template"):
             option["example"] = option["example"].replace("{", "{{").replace("}", "}}")
-        configfile.write("# Example: {} = {}\n".format(option["name"], option["example"]))
+        configfile.write(f"# Example: {option['name']} = {option['example']}\n")
 
     if option["default"] is not None:
         if not isinstance(option["default"], str):
@@ -134,9 +134,9 @@ def _write_option(configfile, idx, option):
             value = " " + option["default"]
         else:
             value = ""
-        configfile.write("{} ={}\n".format(option["name"], value))
+        configfile.write(f"{option['name']} ={value}\n")
     else:
-        configfile.write("# {} =\n".format(option["name"]))
+        configfile.write(f"# {option['name']} =\n")
 
 
 if __name__ == '__main__':
diff --git a/tests/api/common/experimental/test_pool.py b/tests/api/common/experimental/test_pool.py
index ae00226..3c75a14 100644
--- a/tests/api/common/experimental/test_pool.py
+++ b/tests/api/common/experimental/test_pool.py
@@ -39,7 +39,7 @@ class TestPool(unittest.TestCase):
         clear_db_pools()
         self.pools = [Pool.get_default_pool()]
         for i in range(self.USER_POOL_COUNT):
-            name = 'experimental_%s' % (i + 1)
+            name = f'experimental_{i + 1}'
             pool = models.Pool(
                 pool=name,
                 slots=i,
diff --git a/tests/cli/commands/test_connection_command.py b/tests/cli/commands/test_connection_command.py
index ae78892..c81ff81 100644
--- a/tests/cli/commands/test_connection_command.py
+++ b/tests/cli/commands/test_connection_command.py
@@ -496,7 +496,7 @@ class TestCliAddConnections(unittest.TestCase):
                     "connections",
                     "add",
                     "new0",
-                    "--conn-uri=%s" % TEST_URL,
+                    f"--conn-uri={TEST_URL}",
                     "--conn-description=new0 description",
                 ],
                 "Successfully added `conn_id`=new0 : postgresql://airflow:airflow@host:5432/airflow",
@@ -516,7 +516,7 @@ class TestCliAddConnections(unittest.TestCase):
                     "connections",
                     "add",
                     "new1",
-                    "--conn-uri=%s" % TEST_URL,
+                    f"--conn-uri={TEST_URL}",
                     "--conn-description=new1 description",
                 ],
                 "Successfully added `conn_id`=new1 : postgresql://airflow:airflow@host:5432/airflow",
@@ -536,7 +536,7 @@ class TestCliAddConnections(unittest.TestCase):
                     "connections",
                     "add",
                     "new2",
-                    "--conn-uri=%s" % TEST_URL,
+                    f"--conn-uri={TEST_URL}",
                     "--conn-extra",
                     "{'extra': 'yes'}",
                 ],
@@ -557,7 +557,7 @@ class TestCliAddConnections(unittest.TestCase):
                     "connections",
                     "add",
                     "new3",
-                    "--conn-uri=%s" % TEST_URL,
+                    f"--conn-uri={TEST_URL}",
                     "--conn-extra",
                     "{'extra': 'yes'}",
                     "--conn-description",
@@ -651,12 +651,12 @@ class TestCliAddConnections(unittest.TestCase):
     def test_cli_connections_add_duplicate(self):
         conn_id = "to_be_duplicated"
         connection_command.connections_add(
-            self.parser.parse_args(["connections", "add", conn_id, "--conn-uri=%s" % TEST_URL])
+            self.parser.parse_args(["connections", "add", conn_id, f"--conn-uri={TEST_URL}"])
         )
         # Check for addition attempt
         with pytest.raises(SystemExit, match=rf"A connection with `conn_id`={conn_id} already exists"):
             connection_command.connections_add(
-                self.parser.parse_args(["connections", "add", conn_id, "--conn-uri=%s" % TEST_URL])
+                self.parser.parse_args(["connections", "add", conn_id, f"--conn-uri={TEST_URL}"])
             )
 
     def test_cli_connections_add_delete_with_missing_parameters(self):
@@ -671,7 +671,7 @@ class TestCliAddConnections(unittest.TestCase):
         # Attempt to add with invalid uri
         with pytest.raises(SystemExit, match=r"The URI provided to --conn-uri is invalid: nonsense_uri"):
             connection_command.connections_add(
-                self.parser.parse_args(["connections", "add", "new1", "--conn-uri=%s" % "nonsense_uri"])
+                self.parser.parse_args(["connections", "add", "new1", f"--conn-uri={'nonsense_uri'}"])
             )
 
 
diff --git a/tests/core/test_core.py b/tests/core/test_core.py
index fae2c73..f6dd6ee 100644
--- a/tests/core/test_core.py
+++ b/tests/core/test_core.py
@@ -172,7 +172,7 @@ class TestCore(unittest.TestCase):
         op = BashOperator(
             task_id='test_bash_operator_kill',
             execution_timeout=timedelta(seconds=1),
-            bash_command="/bin/bash -c 'sleep %s'" % sleep_time,
+            bash_command=f"/bin/bash -c 'sleep {sleep_time}'",
             dag=self.dag,
         )
         with pytest.raises(AirflowTaskTimeout):
diff --git a/tests/dags/test_subdag.py b/tests/dags/test_subdag.py
index e45b2a1..f2227dc 100644
--- a/tests/dags/test_subdag.py
+++ b/tests/dags/test_subdag.py
@@ -48,7 +48,7 @@ def subdag(parent_dag_name, child_dag_name, args):
 
     for i in range(2):
         DummyOperator(
-            task_id='{}-task-{}'.format(child_dag_name, i + 1),
+            task_id=f'{child_dag_name}-task-{i + 1}',
             default_args=args,
             dag=dag_subdag,
         )
diff --git a/tests/dags_corrupted/test_impersonation_custom.py b/tests/dags_corrupted/test_impersonation_custom.py
index 77ea1ed..4d3b978 100644
--- a/tests/dags_corrupted/test_impersonation_custom.py
+++ b/tests/dags_corrupted/test_impersonation_custom.py
@@ -39,7 +39,7 @@ dag = DAG(dag_id='impersonation_with_custom_pkg', default_args=args)
 
 def print_today():
     date_time = FakeDatetime.utcnow()
-    print('Today is {}'.format(date_time.strftime('%Y-%m-%d')))
+    print(f"Today is {date_time.strftime('%Y-%m-%d')}")
 
 
 def check_hive_conf():
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 944fa49..44edc47 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -391,7 +391,7 @@ class ClassWithCustomAttributes:
             setattr(self, key, value)
 
     def __str__(self):
-        return "{}({})".format(ClassWithCustomAttributes.__name__, str(self.__dict__))
+        return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})"
 
     def __repr__(self):
         return self.__str__()
diff --git a/tests/hooks/test_dbapi.py b/tests/hooks/test_dbapi.py
index 2cc916d..0f6c55a 100644
--- a/tests/hooks/test_dbapi.py
+++ b/tests/hooks/test_dbapi.py
@@ -125,7 +125,7 @@ class TestDbApiHook(unittest.TestCase):
         commit_count = 2  # The first and last commit
         assert commit_count == self.conn.commit.call_count
 
-        sql = "INSERT INTO {} ({}) VALUES (%s)".format(table, target_fields[0])
+        sql = f"INSERT INTO {table} ({target_fields[0]}) VALUES (%s)"
         for row in rows:
             self.cur.execute.assert_any_call(sql, row)
 
diff --git a/tests/models/test_baseoperator.py b/tests/models/test_baseoperator.py
index 95607a6..f942b89 100644
--- a/tests/models/test_baseoperator.py
+++ b/tests/models/test_baseoperator.py
@@ -42,7 +42,7 @@ class ClassWithCustomAttributes:
             setattr(self, key, value)
 
     def __str__(self):
-        return "{}({})".format(ClassWithCustomAttributes.__name__, str(self.__dict__))
+        return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})"
 
     def __repr__(self):
         return self.__str__()
@@ -154,7 +154,7 @@ class TestBaseOperator(unittest.TestCase):
             ({"user_defined_macros": {"foo": "bar"}}, "{{ foo }}", {}, "bar"),
             ({"user_defined_macros": {"foo": "bar"}}, 1, {}, 1),
             (
-                {"user_defined_filters": {"hello": lambda name: "Hello %s" % name}},
+                {"user_defined_filters": {"hello": lambda name: f"Hello {name}"}},
                 "{{ 'world' | hello }}",
                 {},
                 "Hello world",
diff --git a/tests/models/test_connection.py b/tests/models/test_connection.py
index a96b89a..526d029 100644
--- a/tests/models/test_connection.py
+++ b/tests/models/test_connection.py
@@ -55,7 +55,7 @@ class UriTestCaseConfig:
 
     @staticmethod
     def uri_test_name(func, num, param):
-        return "{}_{}_{}".format(func.__name__, num, param.args[0].description.replace(' ', '_'))
+        return f"{func.__name__}_{num}_{param.args[0].description.replace(' ', '_')}"
 
 
 class TestConnection(unittest.TestCase):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 60171d8..123c119 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -476,7 +476,7 @@ class TestDag(unittest.TestCase):
 
     def test_user_defined_filters(self):
         def jinja_udf(name):
-            return 'Hello %s' % name
+            return f'Hello {name}'
 
         dag = models.DAG('test-dag', start_date=DEFAULT_DATE, user_defined_filters={"hello": jinja_udf})
         jinja_env = dag.get_template_env()
@@ -1540,7 +1540,7 @@ class TestDag(unittest.TestCase):
             )
 
             for i in range(2):
-                DummyOperator(task_id='{}-task-{}'.format(child_dag_name, i + 1), dag=dag_subdag)
+                DummyOperator(task_id=f'{child_dag_name}-task-{i + 1}', dag=dag_subdag)
 
             return dag_subdag
 
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 6c6b1cb..ce11892 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -105,7 +105,7 @@ class TestDagBag(unittest.TestCase):
                 dagbag = models.DagBag(include_examples=False, safe_mode=True)
 
             assert len(dagbag.dagbag_stats) == 1
-            assert dagbag.dagbag_stats[0].file == "/{}".format(os.path.basename(f.name))
+            assert dagbag.dagbag_stats[0].file == f"/{os.path.basename(f.name)}"
 
     def test_safe_mode_heuristic_mismatch(self):
         """With safe mode enabled, a file not matching the discovery heuristics
@@ -122,7 +122,7 @@ class TestDagBag(unittest.TestCase):
             with conf_vars({('core', 'dags_folder'): self.empty_dir}):
                 dagbag = models.DagBag(include_examples=False, safe_mode=False)
             assert len(dagbag.dagbag_stats) == 1
-            assert dagbag.dagbag_stats[0].file == "/{}".format(os.path.basename(f.name))
+            assert dagbag.dagbag_stats[0].file == f"/{os.path.basename(f.name)}"
 
     def test_process_file_that_contains_multi_bytes_char(self):
         """
@@ -298,7 +298,7 @@ class TestDagBag(unittest.TestCase):
         actual_found_dag_ids = list(map(lambda dag: dag.dag_id, actual_found_dags))
 
         for dag_id in expected_dag_ids:
-            actual_dagbag.log.info('validating %s' % dag_id)
+            actual_dagbag.log.info(f'validating {dag_id}')
             assert (
                 dag_id in actual_found_dag_ids
             ) == should_be_found, 'dag "{}" should {}have been found after processing dag "{}"'.format(
diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py
index 1cf4e3f..f754753 100644
--- a/tests/models/test_renderedtifields.py
+++ b/tests/models/test_renderedtifields.py
@@ -50,7 +50,7 @@ class ClassWithCustomAttributes:
             setattr(self, key, value)
 
     def __str__(self):
-        return "{}({})".format(ClassWithCustomAttributes.__name__, str(self.__dict__))
+        return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})"
 
     def __repr__(self):
         return self.__str__()
diff --git a/tests/providers/amazon/aws/hooks/test_batch_waiters.py b/tests/providers/amazon/aws/hooks/test_batch_waiters.py
index 51d42e0..b852c2e 100644
--- a/tests/providers/amazon/aws/hooks/test_batch_waiters.py
+++ b/tests/providers/amazon/aws/hooks/test_batch_waiters.py
@@ -198,7 +198,7 @@ def batch_infrastructure(
     assert resp["jobDefinitionArn"]
     job_definition_arn = resp["jobDefinitionArn"]
     assert resp["revision"]
-    assert resp["jobDefinitionArn"].endswith("{}:{}".format(resp["jobDefinitionName"], resp["revision"]))
+    assert resp["jobDefinitionArn"].endswith(f"{resp['jobDefinitionName']}:{resp['revision']}")
 
     infrastructure.vpc_id = vpc_id
     infrastructure.subnet_id = subnet_id
diff --git a/tests/providers/amazon/aws/hooks/test_s3.py b/tests/providers/amazon/aws/hooks/test_s3.py
index d962068..fc00d9d 100644
--- a/tests/providers/amazon/aws/hooks/test_s3.py
+++ b/tests/providers/amazon/aws/hooks/test_s3.py
@@ -118,8 +118,8 @@ class TestAwsS3Hook:
         bucket = hook.get_bucket(s3_bucket)
 
         # we don't need to test the paginator that's covered by boto tests
-        keys = ["%s/b" % i for i in range(2)]
-        dirs = ["%s/" % i for i in range(2)]
+        keys = [f"{i}/b" for i in range(2)]
+        dirs = [f"{i}/" for i in range(2)]
         for key in keys:
             bucket.put_object(Key=key, Body=b'a')
 
diff --git a/tests/providers/apache/hive/operators/test_hive_stats.py b/tests/providers/apache/hive/operators/test_hive_stats.py
index d38e007..02dbdd8 100644
--- a/tests/providers/apache/hive/operators/test_hive_stats.py
+++ b/tests/providers/apache/hive/operators/test_hive_stats.py
@@ -274,17 +274,13 @@ class TestHiveStatsCollectionOperator(TestHiveEnvironment):
         hive_stats_collection_operator = HiveStatsCollectionOperator(**self.kwargs)
         hive_stats_collection_operator.execute(context={})
 
-        sql = """
+        sql = f"""
             DELETE FROM hive_stats
             WHERE
-                table_name='{}' AND
-                partition_repr='{}' AND
-                dttm='{}';
-            """.format(
-            hive_stats_collection_operator.table,
-            mock_json_dumps.return_value,
-            hive_stats_collection_operator.dttm,
-        )
+                table_name='{hive_stats_collection_operator.table}' AND
+                partition_repr='{mock_json_dumps.return_value}' AND
+                dttm='{hive_stats_collection_operator.dttm}';
+            """
         mock_mysql_hook.return_value.run.assert_called_once_with(sql)
 
     @unittest.skipIf(
diff --git a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
index c6f7736..0413b5c 100644
--- a/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
+++ b/tests/providers/apache/hive/transfers/test_mysql_to_hive.py
@@ -317,8 +317,8 @@ class TestTransfer(unittest.TestCase):
             with hook.get_conn() as conn:
                 conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
                 conn.execute(
-                    """
-                    CREATE TABLE {} (
+                    f"""
+                    CREATE TABLE {mysql_table} (
                         c0 TINYINT,
                         c1 SMALLINT,
                         c2 MEDIUMINT,
@@ -326,9 +326,7 @@ class TestTransfer(unittest.TestCase):
                         c4 BIGINT,
                         c5 TIMESTAMP
                     )
-                """.format(
-                        mysql_table
-                    )
+                """
                 )
 
             op = MySqlToHiveOperator(
@@ -368,14 +366,12 @@ class TestTransfer(unittest.TestCase):
             with hook.get_conn() as conn:
                 conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
                 conn.execute(
-                    """
-                    CREATE TABLE {} (
+                    f"""
+                    CREATE TABLE {mysql_table} (
                         c0 VARCHAR(25),
                         c1 VARCHAR(25)
                     )
-                """.format(
-                        mysql_table
-                    )
+                """
                 )
                 conn.execute(
                     """
@@ -475,8 +471,8 @@ class TestTransfer(unittest.TestCase):
             with hook.get_conn() as conn:
                 conn.execute(f"DROP TABLE IF EXISTS {mysql_table}")
                 conn.execute(
-                    """
-                    CREATE TABLE {} (
+                    f"""
+                    CREATE TABLE {mysql_table} (
                         c0 TINYINT   UNSIGNED,
                         c1 SMALLINT  UNSIGNED,
                         c2 MEDIUMINT UNSIGNED,
@@ -488,9 +484,7 @@ class TestTransfer(unittest.TestCase):
                         c8 INT,
                         c9 BIGINT
                     )
-                """.format(
-                        mysql_table
-                    )
+                """
                 )
                 conn.execute(
                     """
diff --git a/tests/providers/apache/spark/hooks/test_spark_sql.py b/tests/providers/apache/spark/hooks/test_spark_sql.py
index 85a5159..35e4330 100644
--- a/tests/providers/apache/spark/hooks/test_spark_sql.py
+++ b/tests/providers/apache/spark/hooks/test_spark_sql.py
@@ -60,11 +60,11 @@ class TestSparkSqlHook(unittest.TestCase):
         cmd = ' '.join(hook._prepare_command(""))
 
         # Check all the parameters
-        assert "--executor-cores {}".format(self._config['executor_cores']) in cmd
-        assert "--executor-memory {}".format(self._config['executor_memory']) in cmd
-        assert "--keytab {}".format(self._config['keytab']) in cmd
-        assert "--name {}".format(self._config['name']) in cmd
-        assert "--num-executors {}".format(self._config['num_executors']) in cmd
+        assert f"--executor-cores {self._config['executor_cores']}" in cmd
+        assert f"--executor-memory {self._config['executor_memory']}" in cmd
+        assert f"--keytab {self._config['keytab']}" in cmd
+        assert f"--name {self._config['name']}" in cmd
+        assert f"--num-executors {self._config['num_executors']}" in cmd
         sql_path = get_after('-f', hook._prepare_command(""))
         assert self._config['sql'].strip() == sql_path
 
diff --git a/tests/providers/apache/sqoop/hooks/test_sqoop.py b/tests/providers/apache/sqoop/hooks/test_sqoop.py
index 332021a..08926d4 100644
--- a/tests/providers/apache/sqoop/hooks/test_sqoop.py
+++ b/tests/providers/apache/sqoop/hooks/test_sqoop.py
@@ -179,29 +179,29 @@ class TestSqoopHook(unittest.TestCase):
 
         # Check if the config has been extracted from the json
         if self._config_json['namenode']:
-            assert "-fs {}".format(self._config_json['namenode']) in cmd
+            assert f"-fs {self._config_json['namenode']}" in cmd
 
         if self._config_json['job_tracker']:
-            assert "-jt {}".format(self._config_json['job_tracker']) in cmd
+            assert f"-jt {self._config_json['job_tracker']}" in cmd
 
         if self._config_json['libjars']:
-            assert "-libjars {}".format(self._config_json['libjars']) in cmd
+            assert f"-libjars {self._config_json['libjars']}" in cmd
 
         if self._config_json['files']:
-            assert "-files {}".format(self._config_json['files']) in cmd
+            assert f"-files {self._config_json['files']}" in cmd
 
         if self._config_json['archives']:
-            assert "-archives {}".format(self._config_json['archives']) in cmd
+            assert f"-archives {self._config_json['archives']}" in cmd
 
-        assert "--hcatalog-database {}".format(self._config['hcatalog_database']) in cmd
-        assert "--hcatalog-table {}".format(self._config['hcatalog_table']) in cmd
+        assert f"--hcatalog-database {self._config['hcatalog_database']}" in cmd
+        assert f"--hcatalog-table {self._config['hcatalog_table']}" in cmd
 
         # Check the regulator stuff passed by the default constructor
         if self._config['verbose']:
             assert "--verbose" in cmd
 
         if self._config['num_mappers']:
-            assert "--num-mappers {}".format(self._config['num_mappers']) in cmd
+            assert f"--num-mappers {self._config['num_mappers']}" in cmd
 
         for key, value in self._config['properties'].items():
             assert f"-D {key}={value}" in cmd
@@ -243,21 +243,14 @@ class TestSqoopHook(unittest.TestCase):
             )
         )
 
-        assert "--input-null-string {}".format(self._config_export['input_null_string']) in cmd
-        assert "--input-null-non-string {}".format(self._config_export['input_null_non_string']) in cmd
-        assert "--staging-table {}".format(self._config_export['staging_table']) in cmd
-        assert "--enclosed-by {}".format(self._config_export['enclosed_by']) in cmd
-        assert "--escaped-by {}".format(self._config_export['escaped_by']) in cmd
-        assert (
-            "--input-fields-terminated-by {}".format(self._config_export['input_fields_terminated_by']) in cmd
-        )
-        assert (
-            "--input-lines-terminated-by {}".format(self._config_export['input_lines_terminated_by']) in cmd
-        )
-        assert (
-            "--input-optionally-enclosed-by {}".format(self._config_export['input_optionally_enclosed_by'])
-            in cmd
-        )
+        assert f"--input-null-string {self._config_export['input_null_string']}" in cmd
+        assert f"--input-null-non-string {self._config_export['input_null_non_string']}" in cmd
+        assert f"--staging-table {self._config_export['staging_table']}" in cmd
+        assert f"--enclosed-by {self._config_export['enclosed_by']}" in cmd
+        assert f"--escaped-by {self._config_export['escaped_by']}" in cmd
+        assert f"--input-fields-terminated-by {self._config_export['input_fields_terminated_by']}" in cmd
+        assert f"--input-lines-terminated-by {self._config_export['input_lines_terminated_by']}" in cmd
+        assert f"--input-optionally-enclosed-by {self._config_export['input_optionally_enclosed_by']}" in cmd
         # these options are from the extra export options
         assert "--update-key id" in cmd
         assert "--update-mode allowinsert" in cmd
@@ -301,10 +294,10 @@ class TestSqoopHook(unittest.TestCase):
         if self._config_import['direct']:
             assert '--direct' in cmd
 
-        assert '--target-dir {}'.format(self._config_import['target_dir']) in cmd
+        assert f"--target-dir {self._config_import['target_dir']}" in cmd
 
-        assert '--driver {}'.format(self._config_import['driver']) in cmd
-        assert '--split-by {}'.format(self._config_import['split_by']) in cmd
+        assert f"--driver {self._config_import['driver']}" in cmd
+        assert f"--split-by {self._config_import['split_by']}" in cmd
         # these are from extra options, but not passed to this cmd import command
         assert '--show' not in cmd
         assert 'hcatalog-storage-stanza \"stored as orcfile\"' not in cmd
diff --git a/tests/providers/elasticsearch/log/elasticmock/__init__.py b/tests/providers/elasticsearch/log/elasticmock/__init__.py
index 2490dbe..c2a9080 100644
--- a/tests/providers/elasticsearch/log/elasticmock/__init__.py
+++ b/tests/providers/elasticsearch/log/elasticmock/__init__.py
@@ -51,7 +51,7 @@ ELASTIC_INSTANCES = {}  # type: Dict[str, FakeElasticsearch]
 
 def _get_elasticmock(hosts=None, *args, **kwargs):  # pylint: disable=unused-argument
     host = _normalize_hosts(hosts)[0]
-    elastic_key = '{}:{}'.format(host.get('host', 'localhost'), host.get('port', 9200))
+    elastic_key = f"{host.get('host', 'localhost')}:{host.get('port', 9200)}"
 
     if elastic_key in ELASTIC_INSTANCES:
         connection = ELASTIC_INSTANCES.get(elastic_key)
diff --git a/tests/providers/google/cloud/hooks/test_cloud_sql.py b/tests/providers/google/cloud/hooks/test_cloud_sql.py
index 003245d..9e9ec70 100644
--- a/tests/providers/google/cloud/hooks/test_cloud_sql.py
+++ b/tests/providers/google/cloud/hooks/test_cloud_sql.py
@@ -1061,9 +1061,7 @@ class TestCloudSqlDatabaseQueryHook(unittest.TestCase):
         project = self.sql_connection.extra_dejson['project_id']
         location = self.sql_connection.extra_dejson['location']
         instance = self.sql_connection.extra_dejson['instance']
-        instance_spec = "{project}:{location}:{instance}".format(
-            project=project, location=location, instance=instance
-        )
+        instance_spec = f"{project}:{location}:{instance}"
         assert sqlproxy_runner.instance_specification == instance_spec
 
     @mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection")
diff --git a/tests/providers/google/cloud/hooks/test_pubsub.py b/tests/providers/google/cloud/hooks/test_pubsub.py
index 0841806..4086526 100644
--- a/tests/providers/google/cloud/hooks/test_pubsub.py
+++ b/tests/providers/google/cloud/hooks/test_pubsub.py
@@ -130,17 +130,17 @@ class TestPubSubHook(unittest.TestCase):
     @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
     def test_delete_nonexisting_topic_failifnotexists(self, mock_service):
         mock_service.return_value.delete_topic.side_effect = NotFound(
-            'Topic does not exists: %s' % EXPANDED_TOPIC
+            f'Topic does not exists: {EXPANDED_TOPIC}'
         )
         with pytest.raises(PubSubException) as ctx:
             self.pubsub_hook.delete_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_not_exists=True)
 
-        assert str(ctx.value) == 'Topic does not exist: %s' % EXPANDED_TOPIC
+        assert str(ctx.value) == f'Topic does not exist: {EXPANDED_TOPIC}'
 
     @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
     def test_delete_topic_api_call_error(self, mock_service):
         mock_service.return_value.delete_topic.side_effect = GoogleAPICallError(
-            'Error deleting topic: %s' % EXPANDED_TOPIC
+            f'Error deleting topic: {EXPANDED_TOPIC}'
         )
         with pytest.raises(PubSubException):
             self.pubsub_hook.delete_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_not_exists=True)
@@ -148,23 +148,23 @@ class TestPubSubHook(unittest.TestCase):
     @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
     def test_create_preexisting_topic_failifexists(self, mock_service):
         mock_service.return_value.create_topic.side_effect = AlreadyExists(
-            'Topic already exists: %s' % TEST_TOPIC
+            f'Topic already exists: {TEST_TOPIC}'
         )
         with pytest.raises(PubSubException) as ctx:
             self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_exists=True)
-        assert str(ctx.value) == 'Topic already exists: %s' % TEST_TOPIC
+        assert str(ctx.value) == f'Topic already exists: {TEST_TOPIC}'
 
     @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
     def test_create_preexisting_topic_nofailifexists(self, mock_service):
         mock_service.return_value.create_topic.side_effect = AlreadyExists(
-            'Topic already exists: %s' % EXPANDED_TOPIC
+            f'Topic already exists: {EXPANDED_TOPIC}'
         )
         self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC)
 
     @mock.patch(PUBSUB_STRING.format('PubSubHook.get_conn'))
     def test_create_topic_api_call_error(self, mock_service):
         mock_service.return_value.create_topic.side_effect = GoogleAPICallError(
-            'Error creating topic: %s' % TEST_TOPIC
+            f'Error creating topic: {TEST_TOPIC}'
         )
         with pytest.raises(PubSubException):
             self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC, fail_if_exists=True)
@@ -238,18 +238,18 @@ class TestPubSubHook(unittest.TestCase):
     @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
     def test_delete_nonexisting_subscription_failifnotexists(self, mock_service):
         mock_service.delete_subscription.side_effect = NotFound(
-            'Subscription does not exists: %s' % EXPANDED_SUBSCRIPTION
+            f'Subscription does not exists: {EXPANDED_SUBSCRIPTION}'
         )
         with pytest.raises(PubSubException) as ctx:
             self.pubsub_hook.delete_subscription(
                 project_id=TEST_PROJECT, subscription=TEST_SUBSCRIPTION, fail_if_not_exists=True
             )
-        assert str(ctx.value) == 'Subscription does not exist: %s' % EXPANDED_SUBSCRIPTION
+        assert str(ctx.value) == f'Subscription does not exist: {EXPANDED_SUBSCRIPTION}'
 
     @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
     def test_delete_subscription_api_call_error(self, mock_service):
         mock_service.delete_subscription.side_effect = GoogleAPICallError(
-            'Error deleting subscription %s' % EXPANDED_SUBSCRIPTION
+            f'Error deleting subscription {EXPANDED_SUBSCRIPTION}'
         )
         with pytest.raises(PubSubException):
             self.pubsub_hook.delete_subscription(
@@ -262,7 +262,7 @@ class TestPubSubHook(unittest.TestCase):
         self, mock_uuid, mock_service
     ):  # noqa  # pylint: disable=unused-argument,line-too-long
         create_method = mock_service.create_subscription
-        expected_name = EXPANDED_SUBSCRIPTION.replace(TEST_SUBSCRIPTION, 'sub-%s' % TEST_UUID)
+        expected_name = EXPANDED_SUBSCRIPTION.replace(TEST_SUBSCRIPTION, f'sub-{TEST_UUID}')
 
         response = self.pubsub_hook.create_subscription(project_id=TEST_PROJECT, topic=TEST_TOPIC)
         create_method.assert_called_once_with(
@@ -282,7 +282,7 @@ class TestPubSubHook(unittest.TestCase):
             timeout=None,
             metadata=None,
         )
-        assert 'sub-%s' % TEST_UUID == response
+        assert f'sub-{TEST_UUID}' == response
 
     @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
     def test_create_subscription_with_ack_deadline(self, mock_service):
@@ -342,18 +342,18 @@ class TestPubSubHook(unittest.TestCase):
     @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
     def test_create_subscription_failifexists(self, mock_service):
         mock_service.create_subscription.side_effect = AlreadyExists(
-            'Subscription already exists: %s' % EXPANDED_SUBSCRIPTION
+            f'Subscription already exists: {EXPANDED_SUBSCRIPTION}'
         )
         with pytest.raises(PubSubException) as ctx:
             self.pubsub_hook.create_subscription(
                 project_id=TEST_PROJECT, topic=TEST_TOPIC, subscription=TEST_SUBSCRIPTION, fail_if_exists=True
             )
-        assert str(ctx.value) == 'Subscription already exists: %s' % EXPANDED_SUBSCRIPTION
+        assert str(ctx.value) == f'Subscription already exists: {EXPANDED_SUBSCRIPTION}'
 
     @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
     def test_create_subscription_api_call_error(self, mock_service):
         mock_service.create_subscription.side_effect = GoogleAPICallError(
-            'Error creating subscription %s' % EXPANDED_SUBSCRIPTION
+            f'Error creating subscription {EXPANDED_SUBSCRIPTION}'
         )
         with pytest.raises(PubSubException):
             self.pubsub_hook.create_subscription(
@@ -363,7 +363,7 @@ class TestPubSubHook(unittest.TestCase):
     @mock.patch(PUBSUB_STRING.format('PubSubHook.subscriber_client'))
     def test_create_subscription_nofailifexists(self, mock_service):
         mock_service.create_subscription.side_effect = AlreadyExists(
-            'Subscription already exists: %s' % EXPANDED_SUBSCRIPTION
+            f'Subscription already exists: {EXPANDED_SUBSCRIPTION}'
         )
         response = self.pubsub_hook.create_subscription(
             project_id=TEST_PROJECT, topic=TEST_TOPIC, subscription=TEST_SUBSCRIPTION
diff --git a/tests/providers/google/cloud/operators/test_dataflow.py b/tests/providers/google/cloud/operators/test_dataflow.py
index 7e290d7..c682a31 100644
--- a/tests/providers/google/cloud/operators/test_dataflow.py
+++ b/tests/providers/google/cloud/operators/test_dataflow.py
@@ -55,7 +55,7 @@ DEFAULT_OPTIONS_TEMPLATE = {
     'zone': 'us-central1-f',
 }
 ADDITIONAL_OPTIONS = {'output': 'gs://test/output', 'labels': {'foo': 'bar'}}
-TEST_VERSION = 'v{}'.format(version.replace('.', '-').replace('+', '-'))
+TEST_VERSION = f"v{version.replace('.', '-').replace('+', '-')}"
 EXPECTED_ADDITIONAL_OPTIONS = {
     'output': 'gs://test/output',
     'labels': {'foo': 'bar', 'airflow-version': TEST_VERSION},
diff --git a/tests/providers/google/cloud/operators/test_mlengine_utils.py b/tests/providers/google/cloud/operators/test_mlengine_utils.py
index 539ee60..65b41b6 100644
--- a/tests/providers/google/cloud/operators/test_mlengine_utils.py
+++ b/tests/providers/google/cloud/operators/test_mlengine_utils.py
@@ -28,7 +28,7 @@ from airflow.providers.google.cloud.utils import mlengine_operator_utils
 from airflow.version import version
 
 DEFAULT_DATE = datetime.datetime(2017, 6, 6)
-TEST_VERSION = 'v{}'.format(version.replace('.', '-').replace('+', '-'))
+TEST_VERSION = f"v{version.replace('.', '-').replace('+', '-')}"
 
 
 class TestCreateEvaluateOps(unittest.TestCase):
@@ -80,7 +80,7 @@ class TestCreateEvaluateOps(unittest.TestCase):
             input_paths=input_with_model['inputPaths'],
             prediction_path=input_with_model['outputPath'],
             metric_fn_and_keys=(self.metric_fn, ['err']),
-            validate_fn=(lambda x: 'err=%.1f' % x['err']),
+            validate_fn=(lambda x: f"err={x['err']:.1f}"),
             dag=self.dag,
             py_interpreter="python3",
         )
@@ -168,7 +168,7 @@ class TestCreateEvaluateOps(unittest.TestCase):
             'input_paths': input_with_model['inputPaths'],
             'prediction_path': input_with_model['outputPath'],
             'metric_fn_and_keys': (self.metric_fn, ['err']),
-            'validate_fn': (lambda x: 'err=%.1f' % x['err']),
+            'validate_fn': (lambda x: f"err={x['err']:.1f}"),
         }
 
         with pytest.raises(AirflowException, match='Missing model origin'):
diff --git a/tests/providers/google/cloud/sensors/test_gcs.py b/tests/providers/google/cloud/sensors/test_gcs.py
index e3c8917..77e0c35 100644
--- a/tests/providers/google/cloud/sensors/test_gcs.py
+++ b/tests/providers/google/cloud/sensors/test_gcs.py
@@ -175,7 +175,7 @@ class TestGoogleCloudStoragePrefixSensor(TestCase):
             impersonation_chain=TEST_IMPERSONATION_CHAIN,
             poke_interval=0,
         )
-        generated_messages = ['test-prefix/obj%s' % i for i in range(5)]
+        generated_messages = [f'test-prefix/obj{i}' for i in range(5)]
         mock_hook.return_value.list.return_value = generated_messages
 
         response = task.execute(None)
diff --git a/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py b/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py
index 1621829..b3c0acd 100644
--- a/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py
+++ b/tests/providers/google/cloud/transfers/test_bigquery_to_bigquery.py
@@ -30,7 +30,7 @@ class TestBigQueryToBigQueryOperator(unittest.TestCase):
     @mock.patch('airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryHook')
     def test_execute(self, mock_hook):
         source_project_dataset_tables = f'{TEST_DATASET}.{TEST_TABLE_ID}'
-        destination_project_dataset_table = '{}.{}'.format(TEST_DATASET + '_new', TEST_TABLE_ID)
+        destination_project_dataset_table = f"{TEST_DATASET + '_new'}.{TEST_TABLE_ID}"
         write_disposition = 'WRITE_EMPTY'
         create_disposition = 'CREATE_IF_NEEDED'
         labels = {'k1': 'v1'}
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index 5c0c38c..2df9fea 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -143,7 +143,7 @@ class TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase):
             source_bucket=TEST_BUCKET,
             source_object=SOURCE_OBJECT_WILDCARD_FILENAME,
             destination_bucket=DESTINATION_BUCKET,
-            destination_object='{}/{}'.format(DESTINATION_OBJECT_PREFIX, SOURCE_OBJECT_WILDCARD_SUFFIX[:-1]),
+            destination_object=f'{DESTINATION_OBJECT_PREFIX}/{SOURCE_OBJECT_WILDCARD_SUFFIX[:-1]}',
         )
 
         operator.execute(None)
diff --git a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py
index 289eb43..5e19c17 100644
--- a/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py
+++ b/tests/providers/google/cloud/utils/test_mlengine_operator_utils.py
@@ -83,11 +83,11 @@ METRIC_KEYS_EXPECTED = ','.join(METRIC_KEYS)
 
 def validate_err_and_count(summary):
     if summary['err'] > 0.2:
-        raise ValueError('Too high err>0.2; summary=%s' % summary)
+        raise ValueError(f'Too high err>0.2; summary={summary}')
     if summary['mse'] > 0.05:
-        raise ValueError('Too high mse>0.05; summary=%s' % summary)
+        raise ValueError(f'Too high mse>0.05; summary={summary}')
     if summary['count'] < 1000:
-        raise ValueError('Too few instances<1000; summary=%s' % summary)
+        raise ValueError(f'Too few instances<1000; summary={summary}')
     return summary
 
 
diff --git a/tests/providers/mysql/hooks/test_mysql.py b/tests/providers/mysql/hooks/test_mysql.py
index 538381f..19f0bd8 100644
--- a/tests/providers/mysql/hooks/test_mysql.py
+++ b/tests/providers/mysql/hooks/test_mysql.py
@@ -431,10 +431,8 @@ class TestMySql(unittest.TestCase):
             from tests.test_utils.asserts import assert_equal_ignore_multiple_spaces
 
             assert mock_execute.call_count == 1
-            query = """
+            query = f"""
                 SELECT * INTO OUTFILE '{tmp_file}'
                 FROM {table}
-            """.format(
-                tmp_file=tmp_file, table=table
-            )
+            """
             assert_equal_ignore_multiple_spaces(self, mock_execute.call_args[0][0], query)
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 2046e22..9c75513 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -197,7 +197,7 @@ def make_user_defined_macro_filter_dag():
         user_defined_macros={
             'next_execution_date': compute_next_execution_date,
         },
-        user_defined_filters={'hello': lambda name: 'Hello %s' % name},
+        user_defined_filters={'hello': lambda name: f'Hello {name}'},
         catchup=False,
     )
     BashOperator(
@@ -731,7 +731,7 @@ class TestStringifiedDAGs(unittest.TestCase):
                 setattr(self, key, value)
 
         def __str__(self):
-            return "{}({})".format(self.__class__.__name__, str(self.__dict__))
+            return f"{self.__class__.__name__}({str(self.__dict__)})"
 
         def __repr__(self):
             return self.__str__()
diff --git a/tests/test_utils/gcp_system_helpers.py b/tests/test_utils/gcp_system_helpers.py
index 6572111..314f09a 100644
--- a/tests/test_utils/gcp_system_helpers.py
+++ b/tests/test_utils/gcp_system_helpers.py
@@ -184,7 +184,7 @@ class GoogleSystemTest(SystemTest):
                 "gsutil",
                 "iam",
                 "ch",
-                "serviceAccount:%s:admin" % account_email,
+                f"serviceAccount:{account_email}:admin",
                 bucket_name,
             ]
         )
diff --git a/tests/test_utils/logging_command_executor.py b/tests/test_utils/logging_command_executor.py
index 1ebf729..5fca244 100644
--- a/tests/test_utils/logging_command_executor.py
+++ b/tests/test_utils/logging_command_executor.py
@@ -57,7 +57,7 @@ class LoggingCommandExecutor(LoggingMixin):
             self.log.info("Stdout: %s", output)
             self.log.info("Stderr: %s", err)
             raise AirflowException(
-                "Retcode {} on {} with stdout: {}, stderr: {}".format(retcode, " ".join(cmd), output, err)
+                f"Retcode {retcode} on {' '.join(cmd)} with stdout: {output}, stderr: {err}"
             )
         return output
 
diff --git a/tests/test_utils/mock_operators.py b/tests/test_utils/mock_operators.py
index 534770e..989b984 100644
--- a/tests/test_utils/mock_operators.py
+++ b/tests/test_utils/mock_operators.py
@@ -82,7 +82,7 @@ class CustomBaseIndexOpLink(BaseOperatorLink):
 
     @property
     def name(self) -> str:
-        return 'BigQuery Console #{index}'.format(index=self.index + 1)
+        return f'BigQuery Console #{self.index + 1}'
 
     def get_link(self, operator, dttm):
         ti = TaskInstance(task=operator, execution_date=dttm)
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index fffa2d4..0477c01 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -43,9 +43,7 @@ class TestHelpers(unittest.TestCase):
         filename_template = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log"
 
         ts = ti.get_template_context()['ts']
-        expected_filename = "{dag_id}/{task_id}/{ts}/{try_number}.log".format(
-            dag_id=dag_id, task_id=task_id, ts=ts, try_number=try_number
-        )
+        expected_filename = f"{dag_id}/{task_id}/{ts}/{try_number}.log"
 
         rendered_filename = helpers.render_log_filename(ti, try_number, filename_template)
 
diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py
index 5981eac..1a379d9 100644
--- a/tests/www/api/experimental/test_endpoints.py
+++ b/tests/www/api/experimental/test_endpoints.py
@@ -368,7 +368,7 @@ class TestPoolApiExperimental(TestBase):
         clear_db_pools()
         self.pools = [Pool.get_default_pool()]
         for i in range(self.USER_POOL_COUNT):
-            name = 'experimental_%s' % (i + 1)
+            name = f'experimental_{i + 1}'
             pool = Pool(
                 pool=name,
                 slots=i,
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 5011547..929076b 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -757,7 +757,7 @@ class TestAirflowBaseViews(TestBase):
         url = 'dag_details?dag_id=test_tree_view'
         resp = self.client.get(url, follow_redirects=True)
         params = {'dag_id': 'test_tree_view', 'origin': '/tree?dag_id=test_tree_view'}
-        href = "/trigger?{}".format(html.escape(urllib.parse.urlencode(params)))
+        href = f"/trigger?{html.escape(urllib.parse.urlencode(params))}"
         self.check_content_in_response(href, resp)
 
     def test_dag_details_trigger_origin_graph_view(self):
@@ -772,7 +772,7 @@ class TestAirflowBaseViews(TestBase):
         url = 'dag_details?dag_id=test_graph_view'
         resp = self.client.get(url, follow_redirects=True)
         params = {'dag_id': 'test_graph_view', 'origin': '/graph?dag_id=test_graph_view'}
-        href = "/trigger?{}".format(html.escape(urllib.parse.urlencode(params)))
+        href = f"/trigger?{html.escape(urllib.parse.urlencode(params))}"
         self.check_content_in_response(href, resp)
 
     def test_dag_details_subdag(self):
@@ -1177,9 +1177,7 @@ class TestLogView(TestBase):
     DAG_ID_REMOVED = 'removed_dag_for_testing_log_view'
     TASK_ID = 'task_for_testing_log_view'
     DEFAULT_DATE = timezone.datetime(2017, 9, 1)
-    ENDPOINT = 'log?dag_id={dag_id}&task_id={task_id}&execution_date={execution_date}'.format(
-        dag_id=DAG_ID, task_id=TASK_ID, execution_date=DEFAULT_DATE
-    )
+    ENDPOINT = f'log?dag_id={DAG_ID}&task_id={TASK_ID}&execution_date={DEFAULT_DATE}'
 
     def setUp(self):
         # Make sure that the configure_logging is not cached
@@ -1277,7 +1275,7 @@ class TestLogView(TestBase):
         for num in range(1, expected_num_logs_visible + 1):
             assert f'log-group-{num}' in response.data.decode('utf-8')
         assert 'log-group-0' not in response.data.decode('utf-8')
-        assert 'log-group-{}'.format(expected_num_logs_visible + 1) not in response.data.decode('utf-8')
+        assert f'log-group-{expected_num_logs_visible + 1}' not in response.data.decode('utf-8')
 
     def test_get_logs_with_metadata_as_download_file(self):
         url_template = (
@@ -1540,7 +1538,7 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester:
         Should set base date to execution date.
         """
         response = self.test.client.get(
-            self.endpoint + '&execution_date={}'.format(self.runs[1].execution_date.isoformat()),
+            self.endpoint + f'&execution_date={self.runs[1].execution_date.isoformat()}',
             data=dict(username='test', password='test'),
             follow_redirects=True,
         )
@@ -1563,7 +1561,7 @@ class ViewWithDateTimeAndNumRunsAndDagRunsFormTester:
         Should set base date and num runs to submitted values.
         """
         response = self.test.client.get(
-            self.endpoint + '&base_date={}&num_runs=2'.format(self.runs[1].execution_date.isoformat()),
+            self.endpoint + f'&base_date={self.runs[1].execution_date.isoformat()}&num_runs=2',
             data=dict(username='test', password='test'),
             follow_redirects=True,
         )
@@ -2849,7 +2847,7 @@ class TestExtraLinks(TestBase):
             name = 'foo-bar'
 
             def get_link(self, operator, dttm):
-                return 'http://www.example.com/{}/{}/{}'.format(operator.task_id, 'foo-bar', dttm)
+                return f"http://www.example.com/{operator.task_id}/foo-bar/{dttm}"
 
         class AirflowLink(BaseOperatorLink):
             name = 'airflow'