You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/05/27 05:09:35 UTC

[airflow] branch master updated: Check synctatic correctness for code-snippets (#16005)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 904709d  Check synctatic correctness for code-snippets (#16005)
904709d is described below

commit 904709d34fbe0b6062d72932b72954afe13ec148
Author: Kamil Breguła <mi...@users.noreply.github.com>
AuthorDate: Thu May 27 07:09:25 2021 +0200

    Check synctatic correctness for code-snippets (#16005)
    
    * Check syntactic correctness for code-snippets
    
    * fixup! Check syntactic correctness for code-snippets
    
    * fixup! fixup! Check syntactic correctness for code-snippets
    
    * fixup! fixup! fixup! Check syntactic correctness for code-snippets
---
 .pre-commit-config.yaml                            |   6 +
 BREEZE.rst                                         |  10 +-
 CONTRIBUTING.rst                                   |  13 +-
 CONTRIBUTORS_QUICK_START.rst                       |   9 +-
 STATIC_CODE_CHECKS.rst                             |  17 +-
 TESTING.rst                                        |  22 +-
 UPDATING.md                                        | 165 ++++++++-----
 airflow/models/dag.py                              |   7 +-
 airflow/operators/bash.py                          |   4 +-
 airflow/operators/python.py                        |   2 +
 airflow/providers/amazon/aws/hooks/batch_client.py |   4 +-
 .../providers/amazon/aws/hooks/batch_waiters.py    |  18 +-
 .../amazon/aws/transfers/dynamodb_to_s3.py         |  20 +-
 airflow/providers/apache/beam/CHANGELOG.rst        |  14 +-
 airflow/providers/ftp/hooks/ftp.py                 |  15 +-
 airflow/providers/google/CHANGELOG.rst             |  14 +-
 .../operators/cloud_storage_transfer_service.py    |  22 +-
 .../providers/google/cloud/operators/dataflow.py   |  93 ++++----
 .../providers/google/cloud/operators/dataproc.py   |  10 +-
 airflow/providers/google/cloud/operators/gcs.py    |  12 +-
 .../providers/google/cloud/transfers/s3_to_gcs.py  |  17 +-
 .../google/cloud/utils/mlengine_operator_utils.py  |  25 +-
 .../cloud/utils/mlengine_prediction_summary.py     |  23 +-
 airflow/providers/http/hooks/http.py               |  17 +-
 airflow/providers/telegram/hooks/telegram.py       |   4 +-
 airflow/sensors/date_time.py                       |   4 +-
 airflow/utils/dates.py                             |  53 +++--
 breeze-complete                                    |   1 +
 .../apache-airflow-providers-apache-beam/index.rst |  14 +-
 .../connections/gcp.rst                            |   4 +-
 docs/apache-airflow-providers-google/index.rst     |  14 +-
 docs/apache-airflow-providers-jdbc/operators.rst   |  16 +-
 .../operators/postgres_operator_howto_guide.rst    |  50 ++--
 .../howto/create-update-providers.rst              |  18 +-
 docs/apache-airflow/best-practices.rst             |  89 +++----
 docs/apache-airflow/concepts/dags.rst              |  40 ++--
 docs/apache-airflow/concepts/operators.rst         |  56 ++---
 docs/apache-airflow/concepts/pools.rst             |   4 +-
 docs/apache-airflow/dag-run.rst                    |  25 +-
 docs/apache-airflow/dag-serialization.rst          |   1 +
 docs/apache-airflow/executor/debug.rst             |   9 +-
 docs/apache-airflow/faq.rst                        |  32 +--
 docs/apache-airflow/howto/add-dag-tags.rst         |   6 +-
 docs/apache-airflow/howto/connection.rst           |  61 ++---
 docs/apache-airflow/howto/custom-operator.rst      |  77 +++---
 .../howto/customize-state-colors-ui.rst            |  18 +-
 docs/apache-airflow/howto/define_extra_link.rst    |  36 +--
 docs/apache-airflow/howto/operator/bash.rst        |  21 +-
 docs/apache-airflow/howto/variable.rst             |   1 +
 docs/apache-airflow/kubernetes.rst                 |   3 +-
 docs/apache-airflow/lineage.rst                    |  29 ++-
 docs/apache-airflow/modules_management.rst         |   2 +-
 docs/apache-airflow/plugins.rst                    |  72 +++---
 docs/apache-airflow/security/secrets/fernet.rst    |   5 +-
 docs/apache-airflow/security/secrets/index.rst     |   3 +-
 docs/apache-airflow/timezone.rst                   |  24 +-
 docs/apache-airflow/tutorial_taskflow_api.rst      |   4 +-
 docs/apache-airflow/ui.rst                         |   2 +-
 docs/apache-airflow/upgrading-to-2.rst             | 258 ++++++++++-----------
 tests/dags_corrupted/README.md                     |   3 +-
 tests/test_utils/perf/perf_kit/__init__.py         |  24 +-
 61 files changed, 872 insertions(+), 770 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index dc5591c..b343c50 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -159,6 +159,12 @@ repos:
       - id: black
         args: [--config=./pyproject.toml]
         exclude: ^airflow/_vendor/
+  - repo: https://github.com/asottile/blacken-docs
+    rev: v1.10.0
+    hooks:
+      - id: blacken-docs
+        alias: black
+        additional_dependencies: [black==21.4b2]
   - repo: https://github.com/pre-commit/pre-commit-hooks
     rev: v3.4.0
     hooks:
diff --git a/BREEZE.rst b/BREEZE.rst
index 2f0510a..09c835c 100644
--- a/BREEZE.rst
+++ b/BREEZE.rst
@@ -2232,11 +2232,11 @@ This is the current syntax for  `./breeze <./breeze>`_:
 
                  all all-but-pylint airflow-config-yaml airflow-providers-available
                  airflow-provider-yaml-files-ok base-operator bats-tests bats-in-container-tests
-                 black build build-providers-dependencies check-apache-license check-builtin-literals
-                 check-executables-have-shebangs check-hooks-apply check-integrations
-                 check-merge-conflict check-xml consistent-pylint daysago-import-check
-                 debug-statements detect-private-key doctoc dont-use-safe-filter end-of-file-fixer
-                 fix-encoding-pragma flake8 flynt forbid-tabs helm-lint identity
+                 black blacken-docs build build-providers-dependencies check-apache-license
+                 check-builtin-literals check-executables-have-shebangs check-hooks-apply
+                 check-integrations check-merge-conflict check-xml consistent-pylint
+                 daysago-import-check debug-statements detect-private-key doctoc dont-use-safe-filter
+                 end-of-file-fixer fix-encoding-pragma flake8 flynt forbid-tabs helm-lint identity
                  incorrect-use-of-LoggingMixin insert-license isort json-schema language-matters
                  lint-dockerfile lint-openapi markdownlint mermaid mixed-line-ending mypy mypy-helm
                  no-providers-in-core-examples no-relative-imports pre-commit-descriptions
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index 1a480e2..2aa18f9 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -960,9 +960,11 @@ To make this easier there is the ``create_session`` helper:
 
     from airflow.utils.session import create_session
 
+
     def my_call(*args, session):
-      ...
-      # You MUST not commit the session here.
+        ...
+        # You MUST not commit the session here.
+
 
     with create_session() as session:
         my_call(*args, session=session)
@@ -973,12 +975,11 @@ If this function is designed to be called by "end-users" (i.e. DAG authors) then
 
     from airflow.utils.session import provide_session
 
-    ...
 
     @provide_session
-    def my_method(arg, arg, session=None)
-      ...
-      # You SHOULD not commit the session here. The wrapper will take care of commit()/rollback() if exception
+    def my_method(arg, session=None):
+        ...
+        # You SHOULD not commit the session here. The wrapper will take care of commit()/rollback() if exception
 
 Don't use time() for duration calculations
 -----------------------------------------
diff --git a/CONTRIBUTORS_QUICK_START.rst b/CONTRIBUTORS_QUICK_START.rst
index ea5f547..dd8b048 100644
--- a/CONTRIBUTORS_QUICK_START.rst
+++ b/CONTRIBUTORS_QUICK_START.rst
@@ -511,10 +511,11 @@ Setting up Debug
 
   .. code-block:: python
 
-    if __name__ == '__main__':
-      from airflow.utils.state import State
-      dag.clear(dag_run_state=State.NONE)
-      dag.run()
+    if __name__ == "__main__":
+        from airflow.utils.state import State
+
+        dag.clear(dag_run_state=State.NONE)
+        dag.run()
 
 - Add ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` to Environment variable of Run Configuration.
 
diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst
index 8a55dd0..75dc31f 100644
--- a/STATIC_CODE_CHECKS.rst
+++ b/STATIC_CODE_CHECKS.rst
@@ -60,6 +60,8 @@ require Breeze Docker images to be installed locally:
 ----------------------------------- ---------------------------------------------------------------- ------------
 ``black``                             Runs Black (the uncompromising Python code formatter)
 ----------------------------------- ---------------------------------------------------------------- ------------
+``blacken-docs``                      Run black on python code blocks in documentation files
+----------------------------------- ---------------------------------------------------------------- ------------
 ``build``                             Builds image for mypy, pylint, flake8                                *
 ----------------------------------- ---------------------------------------------------------------- ------------
 ``build-providers-dependencies``      Regenerates the JSON file with cross-provider dependencies
@@ -331,15 +333,17 @@ These are guidelines for fixing errors reported by pylint:
 
 .. code-block:: python
 
-    import airflow.*  # pylint: disable=wildcard-import
+    from airflow import *  # pylint: disable=wildcard-import
 
 
 -   If there is a single line where you need to disable a particular error,
     consider adding a comment to the line that causes the problem. For example:
 
+
 .. code-block:: python
 
-    def  MakeSummary(pcoll, metric_fn, metric_keys): # pylint: disable=invalid-name
+    def MakeSummary(pcoll, metric_fn, metric_keys):  # pylint: disable=invalid-name
+        ...
 
 
 -   For multiple lines/block of code, to disable an error, you can surround the
@@ -348,10 +352,13 @@ These are guidelines for fixing errors reported by pylint:
 .. code-block:: python
 
     # pylint: disable=too-few-public-methods
-    class  LoginForm(Form):
+    class LoginForm(Form):
         """Form for the user"""
-        username = StringField('Username', [InputRequired()])
-        password = PasswordField('Password', [InputRequired()])
+
+        username = StringField("Username", [InputRequired()])
+        password = PasswordField("Password", [InputRequired()])
+
+
     # pylint: enable=too-few-public-methods
 
 
diff --git a/TESTING.rst b/TESTING.rst
index dd53a06..81b8579 100644
--- a/TESTING.rst
+++ b/TESTING.rst
@@ -205,6 +205,7 @@ go to the ``chart/tests`` directory and add your unit test by creating a class t
 .. code-block:: python
 
     class TestBaseChartTest(unittest.TestCase):
+        ...
 
 To render the chart create a YAML string with the nested dictionary of options you wish to test. You can then
 use our ``render_chart`` function to render the object of interest into a testable Python dictionary. Once the chart
@@ -226,11 +227,13 @@ Example test here:
 
 
     class TestGitSyncScheduler(unittest.TestCase):
-
         def test_basic(self):
             helm_settings = yaml.safe_load(git_sync_basic)
-            res = render_chart('GIT-SYNC', helm_settings,
-                               show_only=["templates/scheduler/scheduler-deployment.yaml"])
+            res = render_chart(
+                "GIT-SYNC",
+                helm_settings,
+                show_only=["templates/scheduler/scheduler-deployment.yaml"],
+            )
             dep: k8s.V1Deployment = render_k8s_object(res[0], k8s.V1Deployment)
             assert "dags" == dep.spec.template.spec.volumes[1].name
 
@@ -327,10 +330,10 @@ Example of the ``redis`` integration test:
 
     @pytest.mark.integration("redis")
     def test_real_ping(self):
-        hook = RedisHook(redis_conn_id='redis_default')
+        hook = RedisHook(redis_conn_id="redis_default")
         redis = hook.get_conn()
 
-        assert redis.ping(), 'Connection to Redis with PING works.'
+        assert redis.ping(), "Connection to Redis with PING works."
 
 The markers can be specified at the test level or the class level (then all tests in this class
 require an integration). You can add multiple markers with different integrations for tests that
@@ -1270,10 +1273,11 @@ It will run a backfill job:
 
 .. code-block:: python
 
-  if __name__ == '__main__':
-    from airflow.utils.state import State
-    dag.clear(dag_run_state=State.NONE)
-    dag.run()
+  if __name__ == "__main__":
+      from airflow.utils.state import State
+
+      dag.clear(dag_run_state=State.NONE)
+      dag.run()
 
 
 2. Set up ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in the run configuration of your IDE.
diff --git a/UPDATING.md b/UPDATING.md
index bf98557..c7a97df 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -343,18 +343,26 @@ The old syntax of passing `context` as a dictionary will continue to work with t
 
 ```python
 def execution_date_fn(execution_date, ctx):
+    ...
 ```
 
 `execution_date_fn` can take in any number of keyword arguments available in the task context dictionary. The following forms of `execution_date_fn` are all supported:
 
 ```python
 def execution_date_fn(dt):
+    ...
+
 
 def execution_date_fn(execution_date):
+    ...
+
 
 def execution_date_fn(execution_date, ds_nodash):
+    ...
+
 
 def execution_date_fn(execution_date, ds_nodash, dag):
+    ...
 ```
 
 ### The default value for `[webserver] cookie_samesite` has been changed to `Lax`
@@ -545,6 +553,7 @@ If your plugin looked like this and was available through the `test_plugin` path
 def my_stat_name_handler(stat):
     return stat
 
+
 class AirflowTestPlugin(AirflowPlugin):
     name = "test_plugin"
     stat_name_handler = my_stat_name_handler
@@ -720,8 +729,8 @@ If want to use them, or your custom hook inherit them, please use ``airflow.hook
 Previously, you could assign a task to a DAG as follows:
 
 ```python
-dag = DAG('my_dag')
-dummy = DummyOperator(task_id='dummy')
+dag = DAG("my_dag")
+dummy = DummyOperator(task_id="dummy")
 
 dag >> dummy
 ```
@@ -729,8 +738,8 @@ dag >> dummy
 This is no longer supported. Instead, we recommend using the DAG as context manager:
 
 ```python
-with DAG('my_dag') as dag:
-    dummy = DummyOperator(task_id='dummy')
+with DAG("my_dag") as dag:
+    dummy = DummyOperator(task_id="dummy")
 ```
 
 #### Removed deprecated import mechanism
@@ -815,7 +824,8 @@ As a result, the `python_callable` argument was removed. PR: https://github.com/
 def myfunc(execution_date):
     print(execution_date)
 
-python_operator = PythonOperator(task_id='mytask', python_callable=myfunc, dag=dag)
+
+python_operator = PythonOperator(task_id="mytask", python_callable=myfunc, dag=dag)
 ```
 
 Notice you don't have to set provide_context=True, variables from the task context are now automatically detected and provided.
@@ -826,7 +836,8 @@ All context variables can still be provided with a double-asterisk argument:
 def myfunc(**context):
     print(context)  # all variables will be provided to context
 
-python_operator = PythonOperator(task_id='mytask', python_callable=myfunc)
+
+python_operator = PythonOperator(task_id="mytask", python_callable=myfunc)
 ```
 
 The task context variable names are reserved names in the callable function, hence a clash with `op_args` and `op_kwargs` results in an exception:
@@ -835,9 +846,11 @@ The task context variable names are reserved names in the callable function, hen
 def myfunc(dag):
     # raises a ValueError because "dag" is a reserved name
     # valid signature example: myfunc(mydag)
+    print("output")
+
 
 python_operator = PythonOperator(
-    task_id='mytask',
+    task_id="mytask",
     op_args=[1],
     python_callable=myfunc,
 )
@@ -979,28 +992,34 @@ This change is caused by adding `run_type` column to `DagRun`.
 Previous signature:
 
 ```python
-def create_dagrun(self,
-                  run_id,
-                  state,
-                  execution_date=None,
-                  start_date=None,
-                  external_trigger=False,
-                  conf=None,
-                  session=None):
+def create_dagrun(
+    self,
+    run_id,
+    state,
+    execution_date=None,
+    start_date=None,
+    external_trigger=False,
+    conf=None,
+    session=None,
+):
+    ...
 ```
 
 current:
 
 ```python
-def create_dagrun(self,
-                  state,
-                  execution_date=None,
-                  run_id=None,
-                  start_date=None,
-                  external_trigger=False,
-                  conf=None,
-                  run_type=None,
-                  session=None):
+def create_dagrun(
+    self,
+    state,
+    execution_date=None,
+    run_id=None,
+    start_date=None,
+    external_trigger=False,
+    conf=None,
+    run_type=None,
+    session=None,
+):
+    ...
 ```
 
 If user provides `run_id` then the `run_type` will be derived from it by checking prefix, allowed types
@@ -1025,7 +1044,7 @@ Previously, there were defined in various places, example as `ID_PREFIX` class v
 
 Was:
 
-```python
+```pycon
 >> from airflow.models.dagrun import DagRun
 >> DagRun.ID_PREFIX
 scheduled__
@@ -1033,7 +1052,7 @@ scheduled__
 
 Replaced by:
 
-```python
+```pycon
 >> from airflow.utils.types import DagRunType
 >> DagRunType.SCHEDULED.value
 scheduled
@@ -1094,8 +1113,9 @@ from airflow.utils.log.logging_mixin import StreamLogWriter
 
 logger = logging.getLogger("custom-logger")
 
-with redirect_stdout(StreamLogWriter(logger, logging.INFO)), \
-        redirect_stderr(StreamLogWriter(logger, logging.WARN)):
+with redirect_stdout(StreamLogWriter(logger, logging.INFO)), redirect_stderr(
+    StreamLogWriter(logger, logging.WARN)
+):
     print("I Love Airflow")
 ```
 
@@ -1117,23 +1137,25 @@ are deprecated and will be removed in future versions.
 **Previous signature**:
 
 ```python
-DagBag(
+def __init__(
     dag_folder=None,
-    include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
-    safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
-    store_serialized_dags=False
+    include_examples=conf.getboolean("core", "LOAD_EXAMPLES"),
+    safe_mode=conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
+    store_serialized_dags=False,
 ):
+    ...
 ```
 
 **current**:
 
 ```python
-DagBag(
+def __init__(
     dag_folder=None,
-    include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
-    safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
-    read_dags_from_db=False
+    include_examples=conf.getboolean("core", "LOAD_EXAMPLES"),
+    safe_mode=conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
+    read_dags_from_db=False,
 ):
+    ...
 ```
 
 If you were using positional arguments, it requires no change but if you were using keyword
@@ -1626,13 +1648,23 @@ We changed signature of BigQueryGetDatasetTablesOperator.
 Before:
 
 ```python
-BigQueryGetDatasetTablesOperator(dataset_id: str, dataset_resource: dict, ...)
+def __init__(
+    dataset_id: str,
+    dataset_resource: dict,
+    # ...
+):
+    ...
 ```
 
 After:
 
 ```python
-BigQueryGetDatasetTablesOperator(dataset_resource: dict, dataset_id: Optional[str] = None, ...)
+def __init__(
+    dataset_resource: dict,
+    dataset_id: Optional[str] = None,
+    # ...
+):
+    ...
 ```
 
 ### Changes in `amazon` provider package
@@ -1793,7 +1825,7 @@ For example:
 from airflow.providers.cloudant.hooks.cloudant import CloudantHook
 
 with CloudantHook().get_conn() as cloudant_session:
-    database = cloudant_session['database_name']
+    database = cloudant_session["database_name"]
 ```
 
 See the [docs](https://python-cloudant.readthedocs.io/en/latest/) for more information on how to use the new cloudant version.
@@ -2101,8 +2133,8 @@ back was ``None``. This will now return an empty string (`'''`)
 Example:
 
 ```python
->> Variable.set('test_key', '')
->> Variable.get('test_key')
+Variable.set("test_key", "")
+Variable.get("test_key")
 ```
 
 The above code returned `None` previously, now it will return `''`.
@@ -2168,8 +2200,8 @@ the `attr` argument is no longer required (or accepted).
 In order to use this function in subclasses of the `BaseOperator`, the `attr` argument must be removed:
 
 ```python
-result = self.render_template('myattr', self.myattr, context)  # Pre-1.10.6 call
-...
+result = self.render_template("myattr", self.myattr, context)  # Pre-1.10.6 call
+# ...
 result = self.render_template(self.myattr, context)  # Post-1.10.6 call
 ```
 
@@ -2258,6 +2290,7 @@ Old signature:
 
 ```python
 def get_task_instances(self, session, start_date=None, end_date=None):
+    ...
 ```
 
 New signature:
@@ -2265,6 +2298,7 @@ New signature:
 ```python
 @provide_session
 def get_task_instances(self, start_date=None, end_date=None, session=None):
+    ...
 ```
 
 #### For `DAG`
@@ -2272,16 +2306,16 @@ def get_task_instances(self, start_date=None, end_date=None, session=None):
 Old signature:
 
 ```python
-def get_task_instances(
-    self, session, start_date=None, end_date=None, state=None):
+def get_task_instances(self, session, start_date=None, end_date=None, state=None):
+    ...
 ```
 
 New signature:
 
 ```python
 @provide_session
-def get_task_instances(
-    self, start_date=None, end_date=None, state=None, session=None):
+def get_task_instances(self, start_date=None, end_date=None, state=None, session=None):
+    ...
 ```
 
 In either case, it is necessary to rewrite calls to the `get_task_instances` method that currently provide the `session` positional argument. New calls to this method look like:
@@ -2413,23 +2447,25 @@ Old signature:
 
 ```python
 def create_transfer_job(self, description, schedule, transfer_spec, project_id=None):
+    ...
 ```
 
 New signature:
 
 ```python
 def create_transfer_job(self, body):
+    ...
 ```
 
 It is necessary to rewrite calls to method. The new call looks like this:
 
 ```python
 body = {
-  'status': 'ENABLED',
-  'projectId': project_id,
-  'description': description,
-  'transferSpec': transfer_spec,
-  'schedule': schedule,
+    "status": "ENABLED",
+    "projectId": project_id,
+    "description": description,
+    "transferSpec": transfer_spec,
+    "schedule": schedule,
 }
 gct_hook.create_transfer_job(body)
 ```
@@ -2444,12 +2480,16 @@ Old signature:
 
 ```python
 def wait_for_transfer_job(self, job):
+    ...
 ```
 
 New signature:
 
 ```python
-def wait_for_transfer_job(self, job, expected_statuses=(GcpTransferOperationStatus.SUCCESS, )):
+def wait_for_transfer_job(
+    self, job, expected_statuses=(GcpTransferOperationStatus.SUCCESS,)
+):
+    ...
 ```
 
 The behavior of `wait_for_transfer_job` has changed:
@@ -2542,7 +2582,7 @@ previously you had this in your user class
 
 ```python
 def is_active(self):
-  return self.active
+    return self.active
 ```
 
 then you need to change it like this
@@ -2550,7 +2590,7 @@ then you need to change it like this
 ```python
 @property
 def is_active(self):
-  return self.active
+    return self.active
 ```
 
 ### Support autodetected schemas to GoogleCloudStorageToBigQueryOperator
@@ -2563,24 +2603,27 @@ define a schema_fields:
 
 ```python
 gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
-  ...
-  schema_fields={...})
+    # ...
+    schema_fields={...}
+)
 ```
 
 or define a schema_object:
 
 ```python
 gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
-  ...
-  schema_object='path/to/schema/object')
+    # ...
+    schema_object="path/to/schema/object"
+)
 ```
 
 or enabled autodetect of schema:
 
 ```python
 gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
-  ...
-  autodetect=True)
+    # ...
+    autodetect=True
+)
 ```
 
 ## Airflow 1.10.1
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 1cf3797..5433ad5 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2378,11 +2378,12 @@ class DagContext:
     .. code-block:: python
 
         with DAG(
-            dag_id='example_dag',
+            dag_id="example_dag",
             default_args=default_args,
-            schedule_interval='0 0 * * *',
-            dagrun_timeout=timedelta(minutes=60)
+            schedule_interval="0 0 * * *",
+            dagrun_timeout=timedelta(minutes=60),
         ) as dag:
+            ...
 
     If you do this the context stores the DAG and whenever new task is created, it will use
     such stored DAG as the parent DAG.
diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py
index 66a70a9..44e1aa2 100644
--- a/airflow/operators/bash.py
+++ b/airflow/operators/bash.py
@@ -113,8 +113,8 @@ class BashOperator(BaseOperator):
 
         bash_task = BashOperator(
             task_id="bash_task",
-            bash_command='echo "here is the message: \'$message\'"',
-            env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
+            bash_command="echo \"here is the message: '$message'\"",
+            env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'},
         )
 
     """
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index fa8020c..83d1018 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -456,6 +456,8 @@ def get_current_context() -> Dict[str, Any]:
     .. code:: python
 
         from airflow.operators.python import get_current_context
+
+
         def my_task():
             context = get_current_context()
             ti = context["ti"]
diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py
index 6aa7156..fd54545 100644
--- a/airflow/providers/amazon/aws/hooks/batch_client.py
+++ b/airflow/providers/amazon/aws/hooks/batch_client.py
@@ -89,7 +89,8 @@ class AwsBatchProtocol(Protocol):
             .. code-block:: python
 
                 import boto3
-                boto3.client('batch').waiter_names == []
+
+                boto3.client("batch").waiter_names == []
 
         .. seealso::
 
@@ -511,6 +512,7 @@ class AwsBatchClientHook(AwsBaseHook):
                 delay = min(max_interval, delay)
                 print(delay / 3, delay)
 
+
             for tries in range(10):
                 exp(tries)
 
diff --git a/airflow/providers/amazon/aws/hooks/batch_waiters.py b/airflow/providers/amazon/aws/hooks/batch_waiters.py
index 9b739dc..ef5f8d3 100644
--- a/airflow/providers/amazon/aws/hooks/batch_waiters.py
+++ b/airflow/providers/amazon/aws/hooks/batch_waiters.py
@@ -44,7 +44,6 @@ class AwsBatchWaitersHook(AwsBatchClientHook):
     """
     A utility to manage waiters for AWS batch services.
 
-    Examples:
     .. code-block:: python
 
         import random
@@ -60,7 +59,7 @@ class AwsBatchWaitersHook(AwsBatchClientHook):
         # modify custom_config['waiters'] as necessary and get a new instance:
         waiters = AwsBatchWaiters(waiter_config=custom_config)
         waiters.waiter_config  # check the custom configuration (this is a deepcopy)
-        waiters.list_waiters() # names of custom waiters
+        waiters.list_waiters()  # names of custom waiters
 
         # During the init for AwsBatchWaiters, the waiter_config is used to build a waiter_model;
         # and note that this only occurs during the class init, to avoid any accidental mutations
@@ -71,12 +70,16 @@ class AwsBatchWaitersHook(AwsBatchClientHook):
         # and the details of the config on that waiter can be further modified without any
         # accidental impact on the generation of new waiters from the defined waiter_model, e.g.
         waiters.get_waiter("JobExists").config.delay  # -> 5
-        waiter = waiters.get_waiter("JobExists")  # -> botocore.waiter.Batch.Waiter.JobExists object
+        waiter = waiters.get_waiter(
+            "JobExists"
+        )  # -> botocore.waiter.Batch.Waiter.JobExists object
         waiter.config.delay = 10
         waiters.get_waiter("JobExists").config.delay  # -> 5 as defined by waiter_model
 
         # To use a specific waiter, update the config and call the `wait()` method for jobId, e.g.
-        waiter = waiters.get_waiter("JobExists")  # -> botocore.waiter.Batch.Waiter.JobExists object
+        waiter = waiters.get_waiter(
+            "JobExists"
+        )  # -> botocore.waiter.Batch.Waiter.JobExists object
         waiter.config.delay = random.uniform(1, 10)  # seconds
         waiter.config.max_attempts = 10
         waiter.wait(jobs=[jobId])
@@ -154,7 +157,8 @@ class AwsBatchWaitersHook(AwsBatchClientHook):
         The ``.waiter_model`` is combined with the ``.client`` to get a specific waiter and
         the properties of that waiter can be modified without any accidental impact on the
         generation of new waiters from the ``.waiter_model``, e.g.
-        .. code-block::
+
+        .. code-block:: python
 
             waiters.get_waiter("JobExists").config.delay  # -> 5
             waiter = waiters.get_waiter("JobExists")  # a new waiter object
@@ -162,9 +166,11 @@ class AwsBatchWaitersHook(AwsBatchClientHook):
             waiters.get_waiter("JobExists").config.delay  # -> 5 as defined by waiter_model
 
         To use a specific waiter, update the config and call the `wait()` method for jobId, e.g.
-        .. code-block::
+
+        .. code-block:: python
 
             import random
+
             waiter = waiters.get_waiter("JobExists")  # a new waiter object
             waiter.config.delay = random.uniform(1, 10)  # seconds
             waiter.config.max_attempts = 10
diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
index 7e0e410..9bc467a 100644
--- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
@@ -63,23 +63,23 @@ class DynamoDBToS3Operator(BaseOperator):
     .. code-block:: python
 
        op1 = DynamoDBToS3Operator(
-           task_id='replicator-1',
-           dynamodb_table_name='hello',
+           task_id="replicator-1",
+           dynamodb_table_name="hello",
            dynamodb_scan_kwargs={
-               'TotalSegments': 2,
-               'Segment': 0,
+               "TotalSegments": 2,
+               "Segment": 0,
            },
-           ...
+           ...,
        )
 
        op2 = DynamoDBToS3Operator(
-           task_id='replicator-2',
-           dynamodb_table_name='hello',
+           task_id="replicator-2",
+           dynamodb_table_name="hello",
            dynamodb_scan_kwargs={
-               'TotalSegments': 2,
-               'Segment': 1,
+               "TotalSegments": 2,
+               "Segment": 1,
            },
-           ...
+           ...,
        )
 
     :param dynamodb_table_name: Dynamodb table to replicate data from
diff --git a/airflow/providers/apache/beam/CHANGELOG.rst b/airflow/providers/apache/beam/CHANGELOG.rst
index b48aaf5..ed98e31 100644
--- a/airflow/providers/apache/beam/CHANGELOG.rst
+++ b/airflow/providers/apache/beam/CHANGELOG.rst
@@ -51,17 +51,19 @@ This is the extra for the ``google`` provider:
 
 .. code-block:: python
 
-        extras_require={
-            ...
-            'apache.beam': ['apache-airflow-providers-apache-beam', 'apache-beam[gcp]'],
-            ....
-        },
+        extras_require = (
+            {
+                # ...
+                "apache.beam": ["apache-airflow-providers-apache-beam", "apache-beam[gcp]"],
+                # ...
+            },
+        )
 
 And likewise this is the extra for the ``apache.beam`` provider:
 
 .. code-block:: python
 
-        extras_require={'google': ['apache-airflow-providers-google', 'apache-beam[gcp]']},
+        extras_require = ({"google": ["apache-airflow-providers-google", "apache-beam[gcp]"]},)
 
 You can still run this with PIP version <= 20.2.4 and go back to the previous behaviour:
 
diff --git a/airflow/providers/ftp/hooks/ftp.py b/airflow/providers/ftp/hooks/ftp.py
index a03e461..ac35aa4 100644
--- a/airflow/providers/ftp/hooks/ftp.py
+++ b/airflow/providers/ftp/hooks/ftp.py
@@ -143,23 +143,28 @@ class FTPHook(BaseHook):
 
         .. code-block:: python
 
-            hook = FTPHook(ftp_conn_id='my_conn')
+            hook = FTPHook(ftp_conn_id="my_conn")
 
-            remote_path = '/path/to/remote/file'
-            local_path = '/path/to/local/file'
+            remote_path = "/path/to/remote/file"
+            local_path = "/path/to/local/file"
 
             # with a custom callback (in this case displaying progress on each read)
             def print_progress(percent_progress):
-                self.log.info('Percent Downloaded: %s%%' % percent_progress)
+                self.log.info("Percent Downloaded: %s%%" % percent_progress)
+
 
             total_downloaded = 0
             total_file_size = hook.get_size(remote_path)
-            output_handle = open(local_path, 'wb')
+            output_handle = open(local_path, "wb")
+
+
             def write_to_file_with_progress(data):
                 total_downloaded += len(data)
                 output_handle.write(data)
                 percent_progress = (total_downloaded / total_file_size) * 100
                 print_progress(percent_progress)
+
+
             hook.retrieve_file(remote_path, None, callback=write_to_file_with_progress)
 
             # without a custom callback data is written to the local_path
diff --git a/airflow/providers/google/CHANGELOG.rst b/airflow/providers/google/CHANGELOG.rst
index 4d2f5a1..aa4f7d8 100644
--- a/airflow/providers/google/CHANGELOG.rst
+++ b/airflow/providers/google/CHANGELOG.rst
@@ -57,17 +57,19 @@ This is the extra for the ``google`` provider:
 
 .. code-block:: python
 
-        extras_require={
-            ...
-            'apache.beam': ['apache-airflow-providers-apache-beam', 'apache-beam[gcp]'],
-            ....
-        },
+        extras_require = (
+            {
+                # ...
+                "apache.beam": ["apache-airflow-providers-apache-beam", "apache-beam[gcp]"],
+                # ...
+            },
+        )
 
 And likewise this is the extra for the ``apache.beam`` provider:
 
 .. code-block:: python
 
-        extras_require={'google': ['apache-airflow-providers-google', 'apache-beam[gcp]']},
+        extras_require = ({"google": ["apache-airflow-providers-google", "apache-beam[gcp]"]},)
 
 You can still run this with PIP version <= 20.2.4 and go back to the previous behaviour:
 
diff --git a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
index 6c803ef..09794c7 100644
--- a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
+++ b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py
@@ -751,11 +751,12 @@ class CloudDataTransferServiceS3ToGCSOperator(BaseOperator):
     .. code-block:: python
 
        s3_to_gcs_transfer_op = S3ToGoogleCloudStorageTransferOperator(
-            task_id='s3_to_gcs_transfer_example',
-            s3_bucket='my-s3-bucket',
-            project_id='my-gcp-project',
-            gcs_bucket='my-gcs-bucket',
-            dag=my_dag)
+           task_id="s3_to_gcs_transfer_example",
+           s3_bucket="my-s3-bucket",
+           project_id="my-gcp-project",
+           gcs_bucket="my-gcs-bucket",
+           dag=my_dag,
+       )
 
     :param s3_bucket: The S3 bucket where to find the objects. (templated)
     :type s3_bucket: str
@@ -921,11 +922,12 @@ class CloudDataTransferServiceGCSToGCSOperator(BaseOperator):
     .. code-block:: python
 
        gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
-            task_id='gcs_to_gcs_transfer_example',
-            source_bucket='my-source-bucket',
-            destination_bucket='my-destination-bucket',
-            project_id='my-gcp-project',
-            dag=my_dag)
+           task_id="gcs_to_gcs_transfer_example",
+           source_bucket="my-source-bucket",
+           destination_bucket="my-destination-bucket",
+           project_id="my-gcp-project",
+           dag=my_dag,
+       )
 
     :param source_bucket: The source Google Cloud Storage bucket where the
          object is. (templated)
diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py
index cb31c11..010c9b6 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -181,36 +181,36 @@ class DataflowCreateJavaJobOperator(BaseOperator):
     **Example**: ::
 
         default_args = {
-            'owner': 'airflow',
-            'depends_on_past': False,
-            'start_date':
-                (2016, 8, 1),
-            'email': ['alex@vanboxel.be'],
-            'email_on_failure': False,
-            'email_on_retry': False,
-            'retries': 1,
-            'retry_delay': timedelta(minutes=30),
-            'dataflow_default_options': {
-                'project': 'my-gcp-project',
-                'zone': 'us-central1-f',
-                'stagingLocation': 'gs://bucket/tmp/dataflow/staging/',
-            }
+            "owner": "airflow",
+            "depends_on_past": False,
+            "start_date": (2016, 8, 1),
+            "email": ["alex@vanboxel.be"],
+            "email_on_failure": False,
+            "email_on_retry": False,
+            "retries": 1,
+            "retry_delay": timedelta(minutes=30),
+            "dataflow_default_options": {
+                "project": "my-gcp-project",
+                "zone": "us-central1-f",
+                "stagingLocation": "gs://bucket/tmp/dataflow/staging/",
+            },
         }
 
-        dag = DAG('test-dag', default_args=default_args)
+        dag = DAG("test-dag", default_args=default_args)
 
         task = DataflowCreateJavaJobOperator(
-            gcp_conn_id='gcp_default',
-            task_id='normalize-cal',
-            jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
+            gcp_conn_id="gcp_default",
+            task_id="normalize-cal",
+            jar="{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar",
             options={
-                'autoscalingAlgorithm': 'BASIC',
-                'maxNumWorkers': '50',
-                'start': '{{ds}}',
-                'partitionType': 'DAY'
-
+                "autoscalingAlgorithm": "BASIC",
+                "maxNumWorkers": "50",
+                "start": "{{ds}}",
+                "partitionType": "DAY",
             },
-            dag=dag)
+            dag=dag,
+        )
+
 
     .. seealso::
         For more detail on job submission have a look at the reference:
@@ -316,9 +316,9 @@ class DataflowCreateJavaJobOperator(BaseOperator):
     .. code-block:: python
 
        default_args = {
-           'dataflow_default_options': {
-               'zone': 'europe-west1-d',
-               'stagingLocation': 'gs://my-staging-bucket/staging/'
+           "dataflow_default_options": {
+               "zone": "europe-west1-d",
+               "stagingLocation": "gs://my-staging-bucket/staging/",
            }
        }
 
@@ -330,17 +330,18 @@ class DataflowCreateJavaJobOperator(BaseOperator):
     .. code-block:: python
 
        t1 = DataflowCreateJavaJobOperator(
-           task_id='dataflow_example',
-           jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar',
+           task_id="dataflow_example",
+           jar="{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar",
            options={
-               'autoscalingAlgorithm': 'BASIC',
-               'maxNumWorkers': '50',
-               'start': '{{ds}}',
-               'partitionType': 'DAY',
-               'labels': {'foo' : 'bar'}
+               "autoscalingAlgorithm": "BASIC",
+               "maxNumWorkers": "50",
+               "start": "{{ds}}",
+               "partitionType": "DAY",
+               "labels": {"foo": "bar"},
            },
-           gcp_conn_id='airflow-conn-id',
-           dag=my-dag)
+           gcp_conn_id="airflow-conn-id",
+           dag=my - dag,
+       )
 
     """
 
@@ -583,10 +584,9 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
     .. code-block:: python
 
        default_args = {
-           'dataflow_default_options': {
-               'zone': 'europe-west1-d',
-               'tempLocation': 'gs://my-staging-bucket/staging/',
-               }
+           "dataflow_default_options": {
+               "zone": "europe-west1-d",
+               "tempLocation": "gs://my-staging-bucket/staging/",
            }
        }
 
@@ -597,14 +597,15 @@ class DataflowTemplatedJobStartOperator(BaseOperator):
     .. code-block:: python
 
        t1 = DataflowTemplatedJobStartOperator(
-           task_id='dataflow_example',
-           template='{{var.value.gcp_dataflow_base}}',
+           task_id="dataflow_example",
+           template="{{var.value.gcp_dataflow_base}}",
            parameters={
-               'inputFile': "gs://bucket/input/my_input.txt",
-               'outputFile': "gs://bucket/output/my_output.txt"
+               "inputFile": "gs://bucket/input/my_input.txt",
+               "outputFile": "gs://bucket/output/my_output.txt",
            },
-           gcp_conn_id='airflow-conn-id',
-           dag=my-dag)
+           gcp_conn_id="airflow-conn-id",
+           dag=my - dag,
+       )
 
     ``template``, ``dataflow_default_options``, ``parameters``, and ``job_name`` are
     templated so you can use variables in them.
diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index fea5acc8..6952139 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1108,11 +1108,11 @@ class DataprocSubmitPigJobOperator(DataprocJobBaseOperator):
     .. code-block:: python
 
         default_args = {
-            'cluster_name': 'cluster-1',
-            'dataproc_pig_jars': [
-                'gs://example/udf/jar/datafu/1.2.0/datafu.jar',
-                'gs://example/udf/jar/gpig/1.2/gpig.jar'
-            ]
+            "cluster_name": "cluster-1",
+            "dataproc_pig_jars": [
+                "gs://example/udf/jar/datafu/1.2.0/datafu.jar",
+                "gs://example/udf/jar/gpig/1.2/gpig.jar",
+            ],
         }
 
     You can pass a pig script as string or file reference. Use variables to pass on
diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py
index 5fa1eb2..f38bea9 100644
--- a/airflow/providers/google/cloud/operators/gcs.py
+++ b/airflow/providers/google/cloud/operators/gcs.py
@@ -96,12 +96,12 @@ class GCSCreateBucketOperator(BaseOperator):
     .. code-block:: python
 
         CreateBucket = GoogleCloudStorageCreateBucketOperator(
-            task_id='CreateNewBucket',
-            bucket_name='test-bucket',
-            storage_class='MULTI_REGIONAL',
-            location='EU',
-            labels={'env': 'dev', 'team': 'airflow'},
-            gcp_conn_id='airflow-conn-id'
+            task_id="CreateNewBucket",
+            bucket_name="test-bucket",
+            storage_class="MULTI_REGIONAL",
+            location="EU",
+            labels={"env": "dev", "team": "airflow"},
+            gcp_conn_id="airflow-conn-id",
         )
 
     """
diff --git a/airflow/providers/google/cloud/transfers/s3_to_gcs.py b/airflow/providers/google/cloud/transfers/s3_to_gcs.py
index 81cd6c4..4787582 100644
--- a/airflow/providers/google/cloud/transfers/s3_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/s3_to_gcs.py
@@ -87,14 +87,15 @@ class S3ToGCSOperator(S3ListOperator):
     .. code-block:: python
 
        s3_to_gcs_op = S3ToGCSOperator(
-            task_id='s3_to_gcs_example',
-            bucket='my-s3-bucket',
-            prefix='data/customers-201804',
-            dest_gcs_conn_id='google_cloud_default',
-            dest_gcs='gs://my.gcs.bucket/some/customers/',
-            replace=False,
-            gzip=True,
-            dag=my-dag)
+           task_id="s3_to_gcs_example",
+           bucket="my-s3-bucket",
+           prefix="data/customers-201804",
+           dest_gcs_conn_id="google_cloud_default",
+           dest_gcs="gs://my.gcs.bucket/some/customers/",
+           replace=False,
+           gzip=True,
+           dag=my - dag,
+       )
 
     Note that ``bucket``, ``prefix``, ``delimiter`` and ``dest_gcs`` are
     templated, so you can use variables in them if you wish.
diff --git a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py
index bd292c9..f99fc33 100644
--- a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py
+++ b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py
@@ -101,21 +101,24 @@ def create_evaluate_ops(  # pylint: disable=too-many-arguments
 
         def get_metric_fn_and_keys():
             import math  # imports should be outside of the metric_fn below.
+
             def error_and_squared_error(inst):
-                label = float(inst['input_label'])
-                classes = float(inst['classes'])  # 0 or 1
-                err = abs(classes-label)
-                squared_err = math.pow(classes-label, 2)
+                label = float(inst["input_label"])
+                classes = float(inst["classes"])  # 0 or 1
+                err = abs(classes - label)
+                squared_err = math.pow(classes - label, 2)
                 return (err, squared_err)  # returns a tuple.
-            return error_and_squared_error, ['err', 'mse']  # key order must match.
+
+            return error_and_squared_error, ["err", "mse"]  # key order must match.
+
 
         def validate_err_and_count(summary):
-            if summary['err'] > 0.2:
-                raise ValueError('Too high err>0.2; summary=%s' % summary)
-            if summary['mse'] > 0.05:
-                raise ValueError('Too high mse>0.05; summary=%s' % summary)
-            if summary['count'] < 1000:
-                raise ValueError('Too few instances<1000; summary=%s' % summary)
+            if summary["err"] > 0.2:
+                raise ValueError("Too high err>0.2; summary=%s" % summary)
+            if summary["mse"] > 0.05:
+                raise ValueError("Too high mse>0.05; summary=%s" % summary)
+            if summary["count"] < 1000:
+                raise ValueError("Too few instances<1000; summary=%s" % summary)
             return summary
 
     For the details on the other BatchPrediction-related arguments (project_id,
diff --git a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py
index d95d8a6..273e5ef 100644
--- a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py
+++ b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py
@@ -89,16 +89,19 @@ To test outside of the dag:
 
 .. code-block:: python
 
-    subprocess.check_call(["python",
-                           "-m",
-                           "airflow.providers.google.cloud.utils.mlengine_prediction_summary",
-                           "--prediction_path=gs://...",
-                           "--metric_fn_encoded=" + metric_fn_encoded,
-                           "--metric_keys=log_loss,mse",
-                           "--runner=DataflowRunner",
-                           "--staging_location=gs://...",
-                           "--temp_location=gs://...",
-                           ])
+    subprocess.check_call(
+        [
+            "python",
+            "-m",
+            "airflow.providers.google.cloud.utils.mlengine_prediction_summary",
+            "--prediction_path=gs://...",
+            "--metric_fn_encoded=" + metric_fn_encoded,
+            "--metric_keys=log_loss,mse",
+            "--runner=DataflowRunner",
+            "--staging_location=gs://...",
+            "--temp_location=gs://...",
+        ]
+    )
 """
 
 import argparse
diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py
index 6cf5f8e..029f5a2 100644
--- a/airflow/providers/http/hooks/http.py
+++ b/airflow/providers/http/hooks/http.py
@@ -170,7 +170,7 @@ class HttpHook(BaseHook):
         :param prepped_request: the prepared request generated in run()
         :type prepped_request: session.prepare_request
         :param extra_options: additional options to be used when executing the request
-            i.e. {'check_response': False} to avoid checking raising exceptions on non 2XX
+            i.e. ``{'check_response': False}`` to avoid checking raising exceptions on non 2XX
             or 3XX status codes
         :type extra_options: dict
         """
@@ -215,16 +215,13 @@ class HttpHook(BaseHook):
 
         .. code-block:: python
 
-            hook = HttpHook(http_conn_id='my_conn',method='GET')
+            hook = HttpHook(http_conn_id="my_conn", method="GET")
             retry_args = dict(
-                 wait=tenacity.wait_exponential(),
-                 stop=tenacity.stop_after_attempt(10),
-                 retry=requests.exceptions.ConnectionError
-             )
-             hook.run_with_advanced_retry(
-                     endpoint='v1/test',
-                     _retry_args=retry_args
-                 )
+                wait=tenacity.wait_exponential(),
+                stop=tenacity.stop_after_attempt(10),
+                retry=requests.exceptions.ConnectionError,
+            )
+            hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args)
 
         """
         self._retry_obj = tenacity.Retrying(**_retry_args)
diff --git a/airflow/providers/telegram/hooks/telegram.py b/airflow/providers/telegram/hooks/telegram.py
index 8d5e803..a4e2c83 100644
--- a/airflow/providers/telegram/hooks/telegram.py
+++ b/airflow/providers/telegram/hooks/telegram.py
@@ -43,12 +43,12 @@ class TelegramHook(BaseHook):
     .. code-block:: python
 
         # Create hook
-        telegram_hook = TelegramHook(telegram_conn_id='telegram_default')
+        telegram_hook = TelegramHook(telegram_conn_id="telegram_default")
         # or telegram_hook = TelegramHook(telegram_conn_id='telegram_default', chat_id='-1xxx')
         # or telegram_hook = TelegramHook(token='xxx:xxx', chat_id='-1xxx')
 
         # Call method from telegram bot client
-        telegram_hook.send_message(None', {"text": "message", "chat_id": "-1xxx"})
+        telegram_hook.send_message(None, {"text": "message", "chat_id": "-1xxx"})
         # or telegram_hook.send_message(None', {"text": "message"})
 
     :param telegram_conn_id: connection that optionally has Telegram API token in the password field
diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py
index 3229371..7afb421 100644
--- a/airflow/sensors/date_time.py
+++ b/airflow/sensors/date_time.py
@@ -45,8 +45,8 @@ class DateTimeSensor(BaseSensorOperator):
         .. code-block:: python
 
             DateTimeSensor(
-                task_id='wait_for_0100',
-                target_time='{{ next_execution_date.tomorrow().replace(hour=1) }}',
+                task_id="wait_for_0100",
+                target_time="{{ next_execution_date.tomorrow().replace(hour=1) }}",
             )
 
     :param target_time: datetime after which the job succeeds. (templated)
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index de5e52b..2f908b1 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -46,17 +46,21 @@ def date_range(
     can be something that can be added to `datetime.datetime`
     or a cron expression as a `str`
 
-    .. code-block:: python
-
-        date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1))
-            [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0),
-            datetime.datetime(2016, 1, 3, 0, 0)]
-        date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta='0 0 * * *')
-            [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0),
-            datetime.datetime(2016, 1, 3, 0, 0)]
-        date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *")
-            [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0),
-            datetime.datetime(2016, 3, 1, 0, 0)]
+    .. code-block:: pycon
+        >>> from airflow.utils.dates import datterange
+        >>> from datetime import datetime, timedelta
+        >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1))
+        [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
+        datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')),
+        datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))]
+        >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta="0 0 * * *")
+        [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
+        datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')),
+        datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))]
+        >>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *")
+        [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
+        datetime.datetime(2016, 2, 1, 0, 0, tzinfo=Timezone('UTC')),
+        datetime.datetime(2016, 3, 1, 0, 0, tzinfo=Timezone('UTC'))]
 
     :param start_date: anchor date to start the series from
     :type start_date: datetime.datetime
@@ -134,18 +138,21 @@ def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)):
     Returns the datetime of the form start_date + i * delta
     which is closest to dt for any non-negative integer i.
     Note that delta may be a datetime.timedelta or a dateutil.relativedelta
-    >>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1))
-    datetime.datetime(2015, 1, 1, 0, 0)
-    >>> round_time(datetime(2015, 1, 2), relativedelta(months=1))
-    datetime.datetime(2015, 1, 1, 0, 0)
-    >>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
-    datetime.datetime(2015, 9, 16, 0, 0)
-    >>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
-    datetime.datetime(2015, 9, 15, 0, 0)
-    >>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
-    datetime.datetime(2015, 9, 14, 0, 0)
-    >>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
-    datetime.datetime(2015, 9, 14, 0, 0)
+
+    .. code-block:: pycon
+
+        >>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1))
+        datetime.datetime(2015, 1, 1, 0, 0)
+        >>> round_time(datetime(2015, 1, 2), relativedelta(months=1))
+        datetime.datetime(2015, 1, 1, 0, 0)
+        >>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
+        datetime.datetime(2015, 9, 16, 0, 0)
+        >>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
+        datetime.datetime(2015, 9, 15, 0, 0)
+        >>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
+        datetime.datetime(2015, 9, 14, 0, 0)
+        >>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
+        datetime.datetime(2015, 9, 14, 0, 0)
     """
     if isinstance(delta, str):
         # It's cron based, so it's easy
diff --git a/breeze-complete b/breeze-complete
index b0985eb..dd4bbf3 100644
--- a/breeze-complete
+++ b/breeze-complete
@@ -85,6 +85,7 @@ base-operator
 bats-tests
 bats-in-container-tests
 black
+blacken-docs
 build
 build-providers-dependencies
 check-apache-license
diff --git a/docs/apache-airflow-providers-apache-beam/index.rst b/docs/apache-airflow-providers-apache-beam/index.rst
index e3e6a9e..cbc8bd9 100644
--- a/docs/apache-airflow-providers-apache-beam/index.rst
+++ b/docs/apache-airflow-providers-apache-beam/index.rst
@@ -146,17 +146,19 @@ This is the extra for the ``google`` provider:
 
 .. code-block:: python
 
-        extras_require={
-            ...
-            'apache.beam': ['apache-airflow-providers-apache-beam', 'apache-beam[gcp]'],
-            ....
-        },
+        extras_require = (
+            {
+                # ...
+                "apache.beam": ["apache-airflow-providers-apache-beam", "apache-beam[gcp]"],
+                # ....
+            },
+        )
 
 And likewise this is the extra for the ``apache.beam`` provider:
 
 .. code-block:: python
 
-        extras_require={'google': ['apache-airflow-providers-google', 'apache-beam[gcp]']},
+        extras_require = ({"google": ["apache-airflow-providers-google", "apache-beam[gcp]"]},)
 
 You can still run this with PIP version <= 20.2.4 and go back to the previous behaviour:
 
diff --git a/docs/apache-airflow-providers-google/connections/gcp.rst b/docs/apache-airflow-providers-google/connections/gcp.rst
index 7cdc8d1..888ebc4 100644
--- a/docs/apache-airflow-providers-google/connections/gcp.rst
+++ b/docs/apache-airflow-providers-google/connections/gcp.rst
@@ -139,7 +139,9 @@ For example:
 
         import os
 
-        from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator
+        from airflow.providers.google.cloud.operators.bigquery import (
+            BigQueryCreateEmptyDatasetOperator,
+        )
 
         IMPERSONATION_CHAIN = "impersonated_account@your_project_id.iam.gserviceaccount.com"
 
diff --git a/docs/apache-airflow-providers-google/index.rst b/docs/apache-airflow-providers-google/index.rst
index 2f3230a..64392a4 100644
--- a/docs/apache-airflow-providers-google/index.rst
+++ b/docs/apache-airflow-providers-google/index.rst
@@ -211,17 +211,19 @@ This is the extra for the ``google`` provider:
 
 .. code-block:: python
 
-        extras_require={
-            ...
-            'apache.beam': ['apache-airflow-providers-apache-beam', 'apache-beam[gcp]'],
-            ....
-        },
+        extras_require = (
+            {
+                # ...
+                "apache.beam": ["apache-airflow-providers-apache-beam", "apache-beam[gcp]"],
+                # ...
+            },
+        )
 
 And likewise this is the extra for the ``apache.beam`` provider:
 
 .. code-block:: python
 
-        extras_require={'google': ['apache-airflow-providers-google', 'apache-beam[gcp]']},
+        extras_require = ({"google": ["apache-airflow-providers-google", "apache-beam[gcp]"]},)
 
 You can still run this with PIP version <= 20.2.4 and go back to the previous behaviour:
 
diff --git a/docs/apache-airflow-providers-jdbc/operators.rst b/docs/apache-airflow-providers-jdbc/operators.rst
index e0da3ff..ccd8670 100644
--- a/docs/apache-airflow-providers-jdbc/operators.rst
+++ b/docs/apache-airflow-providers-jdbc/operators.rst
@@ -36,7 +36,7 @@ To use this operator you need:
   * Install the python module jaydebeapi:
     .. code-block:: bash
 
-      pip install jaydebeapi
+        pip install apache-airflow[jdbc]
 
   * Install a `JVM <https://adoptopenjdk.net/installation.html>`_ and
     add a ``JAVA_HOME`` env variable.
@@ -53,17 +53,17 @@ database is listening for new connections.
 
   .. code-block:: python
 
-    import apache-airflow[jdbc]
-
     driver_class = "com.exasol.jdbc.EXADriver"
     driver_path = "/opt/airflow/drivers/exasol/EXASolution_JDBC-7.0.2/exajdbc.jar"
-    connection_url =  "jdbc:exa:localhost"
+    connection_url = "jdbc:exa:localhost"
     credentials = ["", ""]
 
-    conn = jaydebeapi.connect(driver_class,
-                              connection_url,
-                              credentials,
-                              driver_path,)
+    conn = jaydebeapi.connect(
+        driver_class,
+        connection_url,
+        credentials,
+        driver_path,
+    )
 
 Usage
 ^^^^^
diff --git a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
index 8d9bd96..e2392e3 100644
--- a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
+++ b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst
@@ -70,10 +70,10 @@ Now let's refactor ``create_pet_table`` in our DAG:
 .. code-block:: python
 
         create_pet_table = PostgresOperator(
-              task_id="create_pet_table",
-              postgres_conn_id="postgres_default",
-              sql="sql/pet_schema.sql"
-              )
+            task_id="create_pet_table",
+            postgres_conn_id="postgres_default",
+            sql="sql/pet_schema.sql",
+        )
 
 
 Inserting data into a Postgres database table
@@ -94,10 +94,10 @@ We can then create a PostgresOperator task that populate the ``pet`` table.
 .. code-block:: python
 
   populate_pet_table = PostgresOperator(
-            task_id="populate_pet_table",
-            postgres_conn_id="postgres_default",
-            sql="sql/pet_schema.sql"
-            )
+      task_id="populate_pet_table",
+      postgres_conn_id="postgres_default",
+      sql="sql/pet_schema.sql",
+  )
 
 
 Fetching records from your postgres database table
@@ -108,10 +108,10 @@ Fetching records from your postgres database table can be as simple as:
 .. code-block:: python
 
   get_all_pets = PostgresOperator(
-            task_id="get_all_pets",
-            postgres_conn_id="postgres_default",
-            sql="SELECT * FROM pet;"
-            )
+      task_id="get_all_pets",
+      postgres_conn_id="postgres_default",
+      sql="SELECT * FROM pet;",
+  )
 
 
 
@@ -128,14 +128,11 @@ To find the owner of the pet called 'Lester':
 .. code-block:: python
 
   get_birth_date = PostgresOperator(
-            task_id="get_birth_date",
-            postgres_conn_id="postgres_default",
-            sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
-            parameters={
-                'begin_date': '2020-01-01',
-                'end_date': '2020-12-31'
-                }
-            )
+      task_id="get_birth_date",
+      postgres_conn_id="postgres_default",
+      sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
+      parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
+  )
 
 Now lets refactor our ``get_birth_date`` task. Instead of dumping SQL statements directly into our code, let's tidy things up
 by creating a sql file.
@@ -151,14 +148,11 @@ class.
 .. code-block:: python
 
   get_birth_date = PostgresOperator(
-          task_id="get_birth_date",
-          postgres_conn_id="postgres_default",
-          sql="sql/birth_date.sql",
-          params={
-             'begin_date': '2020-01-01',
-              'end_date': '2020-12-31'
-            }
-          )
+      task_id="get_birth_date",
+      postgres_conn_id="postgres_default",
+      sql="sql/birth_date.sql",
+      params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
+  )
 
 The complete Postgres Operator DAG
 ----------------------------------
diff --git a/docs/apache-airflow-providers/howto/create-update-providers.rst b/docs/apache-airflow-providers/howto/create-update-providers.rst
index 91fb74e..f1426d2 100644
--- a/docs/apache-airflow-providers/howto/create-update-providers.rst
+++ b/docs/apache-airflow-providers/howto/create-update-providers.rst
@@ -196,15 +196,15 @@ any dependency add a empty list.
   .. code-block:: python
 
       PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = {
-          ...
-          'microsoft.winrm': winrm,
-          'mongo': mongo,
-          'mysql': mysql,
-          'neo4j': neo4j,
-          '<NEW_PROVIDER>': [],
-          'odbc': odbc,
-          ...
-          }
+          # ...
+          "microsoft.winrm": winrm,
+          "mongo": mongo,
+          "mysql": mysql,
+          "neo4j": neo4j,
+          "<NEW_PROVIDER>": [],
+          "odbc": odbc,
+          # ...
+      }
 
 In the ``CONTRIBUTING.rst`` adds:
 
diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst
index b2ae4ae..9ce345d 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -150,16 +150,17 @@ Unit tests ensure that there is no incorrect code in your DAG. You can write uni
  from airflow.models import DagBag
  import unittest
 
+
  class TestHelloWorldDAG(unittest.TestCase):
-    @classmethod
-    def setUpClass(cls):
-        cls.dagbag = DagBag()
+     @classmethod
+     def setUpClass(cls):
+         cls.dagbag = DagBag()
 
-    def test_dag_loaded(self):
-        dag = self.dagbag.get_dag(dag_id='hello_world')
-        assert self.dagbag.import_errors == {}
-        assert dag is not None
-        assert len(dag.tasks) == 1
+     def test_dag_loaded(self):
+         dag = self.dagbag.get_dag(dag_id="hello_world")
+         assert self.dagbag.import_errors == {}
+         assert dag is not None
+         assert len(dag.tasks) == 1
 
 **Unit test a DAG structure:**
 This is an example test want to verify the structure of a code-generated DAG against a dict object
@@ -167,20 +168,26 @@ This is an example test want to verify the structure of a code-generated DAG aga
 .. code-block:: python
 
  import unittest
+
+
  class testClass(unittest.TestCase):
-     def assertDagDictEqual(self,source,dag):
+     def assertDagDictEqual(self, source, dag):
          assert dag.task_dict.keys() == source.keys()
          for task_id, downstream_list in source.items():
              assert dag.has_task(task_id)
              task = dag.get_task(task_id)
              assert task.downstream_task_ids == set(downstream_list)
+
      def test_dag(self):
-         self.assertDagDictEqual({
-           "DummyInstruction_0": ["DummyInstruction_1"],
-           "DummyInstruction_1": ["DummyInstruction_2"],
-           "DummyInstruction_2": ["DummyInstruction_3"],
-           "DummyInstruction_3": []
-         },dag)
+         self.assertDagDictEqual(
+             {
+                 "DummyInstruction_0": ["DummyInstruction_1"],
+                 "DummyInstruction_1": ["DummyInstruction_2"],
+                 "DummyInstruction_2": ["DummyInstruction_3"],
+                 "DummyInstruction_3": [],
+             },
+             dag,
+         )
 
 **Unit test for custom operator:**
 
@@ -189,23 +196,28 @@ This is an example test want to verify the structure of a code-generated DAG aga
  import unittest
  from airflow.utils.state import State
 
- DEFAULT_DATE = '2019-10-03'
- TEST_DAG_ID = 'test_my_custom_operator'
+ DEFAULT_DATE = "2019-10-03"
+ TEST_DAG_ID = "test_my_custom_operator"
+
 
  class MyCustomOperatorTest(unittest.TestCase):
-    def setUp(self):
-        self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date' : DEFAULT_DATE})
-        self.op = MyCustomOperator(
-            dag=self.dag,
-            task_id='test',
-            prefix='s3://bucket/some/prefix',
-        )
-        self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE)
-
-    def test_execute_no_trigger(self):
-        self.ti.run(ignore_ti_state=True)
-        assert self.ti.state == State.SUCCESS
-        # Assert something related to tasks results
+     def setUp(self):
+         self.dag = DAG(
+             TEST_DAG_ID,
+             schedule_interval="@daily",
+             default_args={"start_date": DEFAULT_DATE},
+         )
+         self.op = MyCustomOperator(
+             dag=self.dag,
+             task_id="test",
+             prefix="s3://bucket/some/prefix",
+         )
+         self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE)
+
+     def test_execute_no_trigger(self):
+         self.ti.run(ignore_ti_state=True)
+         assert self.ti.state == State.SUCCESS
+         # Assert something related to tasks results
 
 Self-Checks
 ------------
@@ -221,10 +233,10 @@ Similarly, if you have a task that starts a microservice in Kubernetes or Mesos,
 
    task = PushToS3(...)
    check = S3KeySensor(
-      task_id='check_parquet_exists',
-      bucket_key="s3://bucket/key/foo.parquet",
-      poke_interval=0,
-      timeout=0
+       task_id="check_parquet_exists",
+       bucket_key="s3://bucket/key/foo.parquet",
+       poke_interval=0,
+       timeout=0,
    )
    task >> check
 
@@ -243,10 +255,7 @@ You can use environment variables to parameterize the DAG.
 
    import os
 
-   dest = os.environ.get(
-      "MY_DAG_DEST_PATH",
-      "s3://default-target/path/"
-   )
+   dest = os.environ.get("MY_DAG_DEST_PATH", "s3://default-target/path/")
 
 Mocking variables and connections
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -257,7 +266,7 @@ For variable, use :envvar:`AIRFLOW_VAR_{KEY}`.
 
 .. code-block:: python
 
-    with mock.patch.dict('os.environ', AIRFLOW_VAR_KEY="env-value"):
+    with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
         assert "env-value" == Variable.get("key")
 
 For connection, use :envvar:`AIRFLOW_CONN_{CONN_ID}`.
@@ -271,4 +280,4 @@ For connection, use :envvar:`AIRFLOW_CONN_{CONN_ID}`.
     )
     conn_uri = conn.get_uri()
     with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri):
-      assert "cat" == Connection.get("my_conn").login
+        assert "cat" == Connection.get("my_conn").login
diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index 593eba6..76b9b23 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -243,27 +243,28 @@ The ``BranchPythonOperator`` can also be used with XComs allowing branching cont
 .. code-block:: python
 
     def branch_func(ti):
-        xcom_value = int(ti.xcom_pull(task_ids='start_task'))
+        xcom_value = int(ti.xcom_pull(task_ids="start_task"))
         if xcom_value >= 5:
-            return 'continue_task'
+            return "continue_task"
         else:
-            return 'stop_task'
+            return "stop_task"
+
 
     start_op = BashOperator(
-        task_id='start_task',
+        task_id="start_task",
         bash_command="echo 5",
         xcom_push=True,
         dag=dag,
     )
 
     branch_op = BranchPythonOperator(
-        task_id='branch_task',
+        task_id="branch_task",
         python_callable=branch_func,
         dag=dag,
     )
 
-    continue_op = DummyOperator(task_id='continue_task', dag=dag)
-    stop_op = DummyOperator(task_id='stop_task', dag=dag)
+    continue_op = DummyOperator(task_id="continue_task", dag=dag)
+    stop_op = DummyOperator(task_id="stop_task", dag=dag)
 
     start_op >> branch_op >> [continue_op, stop_op]
 
@@ -348,7 +349,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality
 
     .. code-block:: python
 
-        #dags/branch_without_trigger.py
+        # dags/branch_without_trigger.py
         import datetime as dt
 
         from airflow.models import DAG
@@ -356,23 +357,22 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality
         from airflow.operators.python import BranchPythonOperator
 
         dag = DAG(
-            dag_id='branch_without_trigger',
-            schedule_interval='@once',
-            start_date=dt.datetime(2019, 2, 28)
+            dag_id="branch_without_trigger",
+            schedule_interval="@once",
+            start_date=dt.datetime(2019, 2, 28),
         )
 
-        run_this_first = DummyOperator(task_id='run_this_first', dag=dag)
+        run_this_first = DummyOperator(task_id="run_this_first", dag=dag)
         branching = BranchPythonOperator(
-            task_id='branching', dag=dag,
-            python_callable=lambda: 'branch_a'
+            task_id="branching", dag=dag, python_callable=lambda: "branch_a"
         )
 
-        branch_a = DummyOperator(task_id='branch_a', dag=dag)
-        follow_branch_a = DummyOperator(task_id='follow_branch_a', dag=dag)
+        branch_a = DummyOperator(task_id="branch_a", dag=dag)
+        follow_branch_a = DummyOperator(task_id="follow_branch_a", dag=dag)
 
-        branch_false = DummyOperator(task_id='branch_false', dag=dag)
+        branch_false = DummyOperator(task_id="branch_false", dag=dag)
 
-        join = DummyOperator(task_id='join', dag=dag)
+        join = DummyOperator(task_id="join", dag=dag)
 
         run_this_first >> branching
         branching >> branch_a >> follow_branch_a >> join
@@ -462,6 +462,7 @@ To add labels, you can use them directly inline with the ``>>`` and ``<<`` opera
 .. code-block:: python
 
     from airflow.utils.edgemodifier import Label
+
     my_task >> Label("When empty") >> other_task
 
 Or, you can pass a Label object to ``set_upstream``/``set_downstream``:
@@ -469,6 +470,7 @@ Or, you can pass a Label object to ``set_upstream``/``set_downstream``:
 .. code-block:: python
 
     from airflow.utils.edgemodifier import Label
+
     my_task.set_downstream(other_task, Label("When empty"))
 
 Here's an example DAG which illustrates labeling different branches:
@@ -507,7 +509,7 @@ This is especially useful if your tasks are built dynamically from configuration
     ### My great DAG
     """
 
-    dag = DAG('my_dag', default_args=default_args)
+    dag = DAG("my_dag", default_args=default_args)
     dag.doc_md = __doc__
 
     t = BashOperator("foo", dag=dag)
diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst
index 6c339e9..8d9473f 100644
--- a/docs/apache-airflow/concepts/operators.rst
+++ b/docs/apache-airflow/concepts/operators.rst
@@ -67,10 +67,11 @@ For example, say you want to pass the execution date as an environment variable
   # The execution date as YYYY-MM-DD
   date = "{{ ds }}"
   t = BashOperator(
-      task_id='test_env',
-      bash_command='/tmp/test.sh ',
+      task_id="test_env",
+      bash_command="/tmp/test.sh ",
       dag=dag,
-      env={'EXECUTION_DATE': date})
+      env={"EXECUTION_DATE": date},
+  )
 
 Here, ``{{ ds }}`` is a macro, and because the ``env`` parameter of the ``BashOperator`` is templated with Jinja, the execution date will be available as an environment variable named ``EXECUTION_DATE`` in your Bash script.
 
@@ -81,19 +82,18 @@ You can also use Jinja templating with nested fields, as long as these nested fi
 .. code-block:: python
 
     class MyDataReader:
-        template_fields = ['path']
+        template_fields = ["path"]
 
         def __init__(self, my_path):
             self.path = my_path
 
         # [additional code here...]
 
+
     t = PythonOperator(
-        task_id='transform_data',
-        python_callable=transform_data
-        op_args=[
-            MyDataReader('/tmp/{{ ds }}/my_file')
-        ],
+        task_id="transform_data",
+        python_callable=transform_data,
+        op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
         dag=dag,
     )
 
@@ -104,27 +104,27 @@ Deep nested fields can also be substituted, as long as all intermediate fields a
 .. code-block:: python
 
     class MyDataTransformer:
-        template_fields = ['reader']
+        template_fields = ["reader"]
 
         def __init__(self, my_reader):
             self.reader = my_reader
 
         # [additional code here...]
 
+
     class MyDataReader:
-        template_fields = ['path']
+        template_fields = ["path"]
 
         def __init__(self, my_path):
             self.path = my_path
 
         # [additional code here...]
 
+
     t = PythonOperator(
-        task_id='transform_data',
-        python_callable=transform_data
-        op_args=[
-            MyDataTransformer(MyDataReader('/tmp/{{ ds }}/my_file'))
-        ],
+        task_id="transform_data",
+        python_callable=transform_data,
+        op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
         dag=dag,
     )
 
@@ -133,10 +133,10 @@ You can pass custom options to the Jinja ``Environment`` when creating your DAG.
 .. code-block:: python
 
     my_dag = DAG(
-        dag_id='my-dag',
+        dag_id="my-dag",
         jinja_environment_kwargs={
-            'keep_trailing_newline': True,
-             # some other jinja2 Environment options here
+            "keep_trailing_newline": True,
+            # some other jinja2 Environment options here
         },
     )
 
@@ -155,8 +155,9 @@ Now, when the following task is run, ``order_data`` argument is passed a string,
 .. code-block:: python
 
     transform = PythonOperator(
-        task_id="transform", op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
-        python_callable=transform
+        task_id="transform",
+        op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
+        python_callable=transform,
     )
 
 
@@ -172,24 +173,25 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows:
         render_template_as_native_obj=True,
     )
 
+
     def extract():
         data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
         return json.loads(data_string)
 
+
     def transform(order_data):
         print(type(order_data))
         for value in order_data.values():
             total_order_value += value
         return {"total_order_value": total_order_value}
 
-    extract_task = PythonOperator(
-        task_id="extract",
-        python_callable=extract
-    )
+
+    extract_task = PythonOperator(task_id="extract", python_callable=extract)
 
     transform_task = PythonOperator(
-        task_id="transform", op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
-        python_callable=transform
+        task_id="transform",
+        op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
+        python_callable=transform,
     )
 
     extract_task >> transform_task
diff --git a/docs/apache-airflow/concepts/pools.rst b/docs/apache-airflow/concepts/pools.rst
index 5b10fdd..f6c97d7 100644
--- a/docs/apache-airflow/concepts/pools.rst
+++ b/docs/apache-airflow/concepts/pools.rst
@@ -29,9 +29,9 @@ Tasks can then be associated with one of the existing pools by using the ``pool`
 .. code-block:: python
 
     aggregate_db_message_job = BashOperator(
-        task_id='aggregate_db_message_job',
+        task_id="aggregate_db_message_job",
         execution_timeout=timedelta(hours=3),
-        pool='ep_data_pipeline_db_msg_agg',
+        pool="ep_data_pipeline_db_msg_agg",
         bash_command=aggregate_db_message_job_cmd,
         dag=dag,
     )
diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst
index 0752990..0ac9f37 100644
--- a/docs/apache-airflow/dag-run.rst
+++ b/docs/apache-airflow/dag-run.rst
@@ -96,22 +96,23 @@ in the configuration file. When turned off, the scheduler creates a DAG run only
 
 
     default_args = {
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email': ['airflow@example.com'],
-        'email_on_failure': False,
-        'email_on_retry': False,
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5)
+        "owner": "airflow",
+        "depends_on_past": False,
+        "email": ["airflow@example.com"],
+        "email_on_failure": False,
+        "email_on_retry": False,
+        "retries": 1,
+        "retry_delay": timedelta(minutes=5),
     }
 
     dag = DAG(
-        'tutorial',
+        "tutorial",
         default_args=default_args,
         start_date=datetime(2015, 12, 1),
-        description='A simple tutorial DAG',
-        schedule_interval='@daily',
-        catchup=False)
+        description="A simple tutorial DAG",
+        schedule_interval="@daily",
+        catchup=False,
+    )
 
 In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM,
 (or from the command line), a single DAG Run will be created, with an `execution_date` of 2016-01-01,
@@ -214,7 +215,7 @@ Example of a parameterized DAG:
     dag = DAG("example_parameterized_dag", schedule_interval=None, start_date=days_ago(2))
 
     parameterized_task = BashOperator(
-        task_id='parameterized_task',
+        task_id="parameterized_task",
         bash_command="echo value: {{ dag_run.conf['conf1'] }}",
         dag=dag,
     )
diff --git a/docs/apache-airflow/dag-serialization.rst b/docs/apache-airflow/dag-serialization.rst
index 72b8f37..842ab9d 100644
--- a/docs/apache-airflow/dag-serialization.rst
+++ b/docs/apache-airflow/dag-serialization.rst
@@ -114,4 +114,5 @@ define a ``json`` variable in local Airflow settings (``airflow_local_settings.p
 .. code-block:: python
 
     import ujson
+
     json = ujson
diff --git a/docs/apache-airflow/executor/debug.rst b/docs/apache-airflow/executor/debug.rst
index c029826..e956e11 100644
--- a/docs/apache-airflow/executor/debug.rst
+++ b/docs/apache-airflow/executor/debug.rst
@@ -42,10 +42,11 @@ It will run a backfill job:
 
 .. code-block:: python
 
-  if __name__ == '__main__':
-    from airflow.utils.state import State
-    dag.clear(dag_run_state=State.NONE)
-    dag.run()
+  if __name__ == "__main__":
+      from airflow.utils.state import State
+
+      dag.clear(dag_run_state=State.NONE)
+      dag.run()
 
 
 2. Setup ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in run configuration of your IDE. In
diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst
index 5122312..b9f6ccb 100644
--- a/docs/apache-airflow/faq.rst
+++ b/docs/apache-airflow/faq.rst
@@ -180,9 +180,8 @@ fields. They are also included in the context dictionary given to an Operator's
 .. code-block:: python
 
         class MyOperator(BaseOperator):
-
             def execute(self, context):
-                logging.info(context['execution_date'])
+                logging.info(context["execution_date"])
 
 Note that ``ds`` refers to date_string, not date start as may be confusing to some.
 
@@ -208,11 +207,11 @@ simple dictionary.
 
 
     for i in range(10):
-        dag_id = f'foo_{i}'
+        dag_id = f"foo_{i}"
         globals()[dag_id] = DAG(dag_id)
 
         # or better, call a function that returns a DAG object!
-        other_dag_id = f'bar_{i}'
+        other_dag_id = f"bar_{i}"
         globals()[other_dag_id] = create_dag(other_dag_id)
 
 Even though Airflow supports multiple DAG definition per python file, dynamically generated or otherwise, it is not
@@ -244,27 +243,17 @@ commonly attempted in ``user_defined_macros``.
 .. code-block:: python
 
         dag = DAG(
-            ...
-            user_defined_macros={
-                'my_custom_macro': 'day={{ ds }}'
-            }
+            # ...
+            user_defined_macros={"my_custom_macro": "day={{ ds }}"}
         )
 
-        bo = BashOperator(
-            task_id='my_task',
-            bash_command="echo {{ my_custom_macro }}",
-            dag=dag
-        )
+        bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag)
 
 This will echo "day={{ ds }}" instead of "day=2020-01-01" for a dagrun with the execution date 2020-01-01 00:00:00.
 
 .. code-block:: python
 
-        bo = BashOperator(
-            task_id='my_task',
-            bash_command="echo day={{ ds }}",
-            dag=dag
-        )
+        bo = BashOperator(task_id="my_task", bash_command="echo day={{ ds }}", dag=dag)
 
 By using the ds macros directly in the template_field, the rendered value results in "day=2020-01-01".
 
@@ -318,16 +307,15 @@ upstream task.
     def b_func():
         pass
 
-    @dag(
-        schedule_interval='@once',
-        start_date=datetime(2021, 1, 1)
-    )
+
+    @dag(schedule_interval="@once", start_date=datetime(2021, 1, 1))
     def my_dag():
         a = a_func()
         b = b_func()
 
         a >> b
 
+
     dag = my_dag()
 
 See :ref:`concepts:trigger-rules` for more information.
diff --git a/docs/apache-airflow/howto/add-dag-tags.rst b/docs/apache-airflow/howto/add-dag-tags.rst
index 02d7c07..236414a 100644
--- a/docs/apache-airflow/howto/add-dag-tags.rst
+++ b/docs/apache-airflow/howto/add-dag-tags.rst
@@ -32,11 +32,7 @@ In your Dag file, pass a list of tags you want to add to DAG object:
 
 .. code-block:: python
 
-  dag = DAG(
-    dag_id='example_dag_tag',
-    schedule_interval='0 0 * * *',
-    tags=['example']
-  )
+  dag = DAG(dag_id="example_dag_tag", schedule_interval="0 0 * * *", tags=["example"])
 
 
 **Screenshot**:
diff --git a/docs/apache-airflow/howto/connection.rst b/docs/apache-airflow/howto/connection.rst
index 09aa294..2b8afd2 100644
--- a/docs/apache-airflow/howto/connection.rst
+++ b/docs/apache-airflow/howto/connection.rst
@@ -221,15 +221,15 @@ The above URI would produce a ``Connection`` object equivalent to the following:
 .. code-block:: python
 
     Connection(
-        conn_id='',
-        conn_type='my_conn_type',
+        conn_id="",
+        conn_type="my_conn_type",
         description=None,
-        login='my-login',
-        password='my-password',
-        host='my-host',
+        login="my-login",
+        password="my-password",
+        host="my-host",
         port=5432,
-        schema='my-schema',
-        extra=json.dumps(dict(param1='val1', param2='val2'))
+        schema="my-schema",
+        extra=json.dumps(dict(param1="val1", param2="val2")),
     )
 
 
@@ -245,16 +245,15 @@ convenience method :py:meth:`~airflow.models.connection.Connection.get_uri`.  It
 
     >>> import json
     >>> from airflow.models.connection import Connection
-
     >>> c = Connection(
-    >>>     conn_id='some_conn',
-    >>>     conn_type='mysql',
-    >>>     description='connection description',
-    >>>     host='myhost.com',
-    >>>     login='myname',
-    >>>     password='mypassword',
-    >>>     extra=json.dumps(dict(this_param='some val', that_param='other val*')),
-    >>> )
+    ...     conn_id="some_conn",
+    ...     conn_type="mysql",
+    ...     description="connection description",
+    ...     host="myhost.com",
+    ...     login="myname",
+    ...     password="mypassword",
+    ...     extra=json.dumps(dict(this_param="some val", that_param="other val*")),
+    ... )
     >>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
     AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'
 
@@ -288,16 +287,16 @@ For example:
 
 .. code-block:: pycon
 
-    >>> extra_dict = {'my_val': ['list', 'of', 'values'], 'extra': {'nested': {'json': 'val'}}}
+    >>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
     >>> c = Connection(
-    >>>     conn_type='scheme',
-    >>>     host='host/location',
-    >>>     schema='schema',
-    >>>     login='user',
-    >>>     password='password',
-    >>>     port=1234,
-    >>>     extra=json.dumps(extra_dict),
-    >>> )
+    ...     conn_type="scheme",
+    ...     host="host/location",
+    ...     schema="schema",
+    ...     login="user",
+    ...     password="password",
+    ...     port=1234,
+    ...     extra=json.dumps(extra_dict),
+    ... )
     >>> uri = c.get_uri()
     >>> uri
     'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'
@@ -320,7 +319,9 @@ You can verify a URI is parsed correctly like so:
 
     >>> from airflow.models.connection import Connection
 
-    >>> c = Connection(uri='my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2')
+    >>> c = Connection(
+    ...     uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2"
+    ... )
     >>> print(c.login)
     my-login
     >>> print(c.password)
@@ -342,14 +343,18 @@ For example if your password has a ``/``, this fails:
 
 .. code-block:: pycon
 
-    >>> c = Connection(uri='my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1&param2=val2')
+    >>> c = Connection(
+    ...     uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1&param2=val2"
+    ... )
     ValueError: invalid literal for int() with base 10: 'my-pa'
 
 To fix this, you can encode with :func:`~urllib.parse.quote_plus`:
 
 .. code-block:: pycon
 
-    >>> c = Connection(uri='my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2')
+    >>> c = Connection(
+    ...     uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2"
+    ... )
     >>> print(c.password)
     my-pa/ssword
 
diff --git a/docs/apache-airflow/howto/custom-operator.rst b/docs/apache-airflow/howto/custom-operator.rst
index 648ef73..091f076 100644
--- a/docs/apache-airflow/howto/custom-operator.rst
+++ b/docs/apache-airflow/howto/custom-operator.rst
@@ -46,12 +46,9 @@ Let's implement an example ``HelloOperator`` in a new file ``hello_operator.py``
 
         from airflow.models.baseoperator import BaseOperator
 
-        class HelloOperator(BaseOperator):
 
-            def __init__(
-                    self,
-                    name: str,
-                    **kwargs) -> None:
+        class HelloOperator(BaseOperator):
+            def __init__(self, name: str, **kwargs) -> None:
                 super().__init__(**kwargs)
                 self.name = name
 
@@ -75,7 +72,7 @@ You can now use the derived custom operator as follows:
     from custom_operator.hello_operator import HelloOperator
 
     with dag:
-        hello_task = HelloOperator(task_id='sample-task', name='foo_bar')
+        hello_task = HelloOperator(task_id="sample-task", name="foo_bar")
 
 You also can keep using your plugins folder for storing your custom operators. If you have the file
 ``hello_operator.py`` within the plugins folder, you can import the operator as follows:
@@ -110,26 +107,19 @@ Let's extend our previous example to fetch name from MySQL:
 .. code-block:: python
 
     class HelloDBOperator(BaseOperator):
-
-            def __init__(
-                    self,
-                    name: str,
-                    mysql_conn_id: str,
-                    database: str,
-                    **kwargs) -> None:
-                super().__init__(**kwargs)
-                self.name = name
-                self.mysql_conn_id = mysql_conn_id
-                self.database = database
-
-            def execute(self, context):
-                hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
-                         schema=self.database)
-                sql = "select name from user"
-                result = hook.get_first(sql)
-                message = "Hello {}".format(result['name'])
-                print(message)
-                return message
+        def __init__(self, name: str, mysql_conn_id: str, database: str, **kwargs) -> None:
+            super().__init__(**kwargs)
+            self.name = name
+            self.mysql_conn_id = mysql_conn_id
+            self.database = database
+
+        def execute(self, context):
+            hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database)
+            sql = "select name from user"
+            result = hook.get_first(sql)
+            message = "Hello {}".format(result["name"])
+            print(message)
+            return message
 
 When the operator invokes the query on the hook object, a new connection gets created if it doesn't exist.
 The hook retrieves the auth parameters such as username and password from Airflow
@@ -149,9 +139,9 @@ Override ``ui_fgcolor`` to change the color of the label.
 .. code-block:: python
 
         class HelloOperator(BaseOperator):
-            ui_color = '#ff0000'
-            ui_fgcolor = '#000000'
-            ....
+            ui_color = "#ff0000"
+            ui_fgcolor = "#000000"
+            # ...
 
 Templating
 ^^^^^^^^^^^
@@ -163,12 +153,9 @@ the operator.
 
         class HelloOperator(BaseOperator):
 
-            template_fields = ['name']
+            template_fields = ["name"]
 
-            def __init__(
-                    self,
-                    name: str,
-                    **kwargs) -> None:
+            def __init__(self, name: str, **kwargs) -> None:
                 super().__init__(**kwargs)
                 self.name = name
 
@@ -182,7 +169,9 @@ You can use the template as follows:
 .. code-block:: python
 
         with dag:
-            hello_task = HelloOperator(task_id='task_id_1', dag=dag, name='{{ task_instance.task_id }}')
+            hello_task = HelloOperator(
+                task_id="task_id_1", dag=dag, name="{{ task_instance.task_id }}"
+            )
 
 In this example, Jinja looks for the ``name`` parameter and substitutes ``{{ task_instance.task_id }}`` with
 ``task_id_1``.
@@ -197,13 +186,10 @@ with actual value. Note that Jinja substitutes the operator attributes and not t
 
         class HelloOperator(BaseOperator):
 
-            template_fields = ['guest_name']
-            template_ext = ['.sql']
+            template_fields = ["guest_name"]
+            template_ext = [".sql"]
 
-            def __init__(
-                    self,
-                    name: str,
-                    **kwargs) -> None:
+            def __init__(self, name: str, **kwargs) -> None:
                 super().__init__(**kwargs)
                 self.guest_name = name
 
@@ -215,13 +201,10 @@ from template field renders in Web UI. For example:
 .. code-block:: python
 
         class MyRequestOperator(BaseOperator):
-            template_fields = ['request_body']
-            template_fields_renderers = {'request_body': 'json'}
+            template_fields = ["request_body"]
+            template_fields_renderers = {"request_body": "json"}
 
-            def __init__(
-                    self,
-                    request_body: str,
-                    **kwargs) -> None:
+            def __init__(self, request_body: str, **kwargs) -> None:
                 super().__init__(**kwargs)
                 self.request_body = request_body
 
diff --git a/docs/apache-airflow/howto/customize-state-colors-ui.rst b/docs/apache-airflow/howto/customize-state-colors-ui.rst
index ae7827c..db9a919 100644
--- a/docs/apache-airflow/howto/customize-state-colors-ui.rst
+++ b/docs/apache-airflow/howto/customize-state-colors-ui.rst
@@ -33,15 +33,15 @@ following steps:
     .. code-block:: python
 
       STATE_COLORS = {
-        "queued": 'darkgray',
-        "running": '#01FF70',
-        "success": '#2ECC40',
-        "failed": 'firebrick',
-        "up_for_retry": 'yellow',
-        "up_for_reschedule": 'turquoise',
-        "upstream_failed": 'orange',
-        "skipped": 'darkorchid',
-        "scheduled": 'tan',
+          "queued": "darkgray",
+          "running": "#01FF70",
+          "success": "#2ECC40",
+          "failed": "firebrick",
+          "up_for_retry": "yellow",
+          "up_for_reschedule": "turquoise",
+          "upstream_failed": "orange",
+          "skipped": "darkorchid",
+          "scheduled": "tan",
       }
 
 
diff --git a/docs/apache-airflow/howto/define_extra_link.rst b/docs/apache-airflow/howto/define_extra_link.rst
index cf7aba8..2a89323 100644
--- a/docs/apache-airflow/howto/define_extra_link.rst
+++ b/docs/apache-airflow/howto/define_extra_link.rst
@@ -36,16 +36,15 @@ The following code shows how to add extra links to an operator via Plugins:
 
 
     class GoogleLink(BaseOperatorLink):
-        name = 'Google'
+        name = "Google"
 
         def get_link(self, operator, dttm):
             return "https://www.google.com"
 
+
     class MyFirstOperator(BaseOperator):
 
-        operator_extra_links = (
-            GoogleLink(),
-        )
+        operator_extra_links = (GoogleLink(),)
 
         def __init__(self, **kwargs):
             super().__init__(**kwargs)
@@ -53,10 +52,13 @@ The following code shows how to add extra links to an operator via Plugins:
         def execute(self, context):
             self.log.info("Hello World!")
 
+
     # Defining the plugin class
     class AirflowExtraLinkPlugin(AirflowPlugin):
         name = "extra_link_plugin"
-        operator_extra_links = [GoogleLink(), ]
+        operator_extra_links = [
+            GoogleLink(),
+        ]
 
 .. note:: Operator Extra Links should be registered via Airflow Plugins or custom Airflow Provider to work.
 
@@ -83,24 +85,28 @@ tasks using :class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Ope
   from airflow.models.baseoperator import BaseOperatorLink
   from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
 
+
   class S3LogLink(BaseOperatorLink):
-      name = 'S3'
+      name = "S3"
 
       # Add list of all the operators to which you want to add this OperatorLinks
       # Example: operators = [GCSToS3Operator, GCSToBigQueryOperator]
       operators = [GCSToS3Operator]
 
       def get_link(self, operator, dttm):
-          return 'https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}'.format(
+          return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}".format(
               dag_id=operator.dag_id,
               task_id=operator.task_id,
               execution_date=dttm,
           )
 
+
   # Defining the plugin class
   class AirflowExtraLinkPlugin(AirflowPlugin):
       name = "extra_link_plugin"
-      operator_extra_links = [S3LogLink(), ]
+      operator_extra_links = [
+          S3LogLink(),
+      ]
 
 
 
@@ -117,25 +123,29 @@ Console, but if we wanted to change that link we could:
     from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator
 
     # Change from https to http just to display the override
-    BIGQUERY_JOB_DETAILS_LINK_FMT = 'http://console.cloud.google.com/bigquery?j={job_id}'
+    BIGQUERY_JOB_DETAILS_LINK_FMT = "http://console.cloud.google.com/bigquery?j={job_id}"
 
 
     class BigQueryConsoleLink(BaseOperatorLink):
         """
         Helper class for constructing BigQuery link.
         """
-        name = 'BigQuery Console'
+
+        name = "BigQuery Console"
         operators = [BigQueryOperator]
 
         def get_link(self, operator, dttm):
             ti = TaskInstance(task=operator, execution_date=dttm)
-            job_id = ti.xcom_pull(task_ids=operator.task_id, key='job_id')
-            return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ''
+            job_id = ti.xcom_pull(task_ids=operator.task_id, key="job_id")
+            return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
+
 
     # Defining the plugin class
     class AirflowExtraLinkPlugin(AirflowPlugin):
         name = "extra_link_plugin"
-        operator_extra_links = [BigQueryConsoleLink(), ]
+        operator_extra_links = [
+            BigQueryConsoleLink(),
+        ]
 
 
 **Adding Operator Links via Providers**
diff --git a/docs/apache-airflow/howto/operator/bash.rst b/docs/apache-airflow/howto/operator/bash.rst
index debdd26..6af6be6 100644
--- a/docs/apache-airflow/howto/operator/bash.rst
+++ b/docs/apache-airflow/howto/operator/bash.rst
@@ -70,8 +70,8 @@ inside the bash_command, as below:
 
     bash_task = BashOperator(
         task_id="bash_task",
-        bash_command='echo "here is the message: \'$message\'"',
-        env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
+        bash_command="echo \"here is the message: '$message'\"",
+        env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'},
     )
 
 Skipping
@@ -100,14 +100,13 @@ template to it, which will fail.
 .. code-block:: python
 
     t2 = BashOperator(
-        task_id='bash_example',
-
+        task_id="bash_example",
         # This fails with 'Jinja template not found' error
         # bash_command="/home/batcher/test.sh",
-
         # This works (has a space after)
         bash_command="/home/batcher/test.sh ",
-        dag=dag)
+        dag=dag,
+    )
 
 However, if you want to use templating in your bash script, do not add the space
 and instead put your bash script in a location relative to the directory containing
@@ -119,10 +118,11 @@ as shown below:
 .. code-block:: python
 
     t2 = BashOperator(
-        task_id='bash_example',
+        task_id="bash_example",
         # "scripts" folder is under "/usr/local/airflow/dags"
         bash_command="scripts/test.sh",
-        dag=dag)
+        dag=dag,
+    )
 
 Creating separate folder for bash scripts may be desirable for many reasons, like
 separating your script's logic and pipeline code, allowing for proper code highlighting
@@ -138,7 +138,8 @@ Example:
 
     dag = DAG("example_bash_dag", template_searchpath="/opt/scripts")
     t2 = BashOperator(
-        task_id='bash_example',
+        task_id="bash_example",
         # "test.sh" is a file under "/opt/scripts"
         bash_command="test.sh ",
-        dag=dag)
+        dag=dag,
+    )
diff --git a/docs/apache-airflow/howto/variable.rst b/docs/apache-airflow/howto/variable.rst
index 3ea613b..7cb9377 100644
--- a/docs/apache-airflow/howto/variable.rst
+++ b/docs/apache-airflow/howto/variable.rst
@@ -53,6 +53,7 @@ You can use them in your DAGs as:
 .. code-block:: python
 
     from airflow.models import Variable
+
     foo = Variable.get("foo")
     foo_json = Variable.get("foo_baz", deserialize_json=True)
 
diff --git a/docs/apache-airflow/kubernetes.rst b/docs/apache-airflow/kubernetes.rst
index 873fb00..b88a83d 100644
--- a/docs/apache-airflow/kubernetes.rst
+++ b/docs/apache-airflow/kubernetes.rst
@@ -57,5 +57,6 @@ to every worker pod launched by KubernetesExecutor or KubernetesPodOperator.
 
     from kubernetes.client.models import V1Pod
 
+
     def pod_mutation_hook(pod: V1Pod):
-        pod.metadata.annotations['airflow.apache.org/launched-by'] = 'Tests'
+        pod.metadata.annotations["airflow.apache.org/launched-by"] = "Tests"
diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst
index 362d3e6..227a386 100644
--- a/docs/apache-airflow/lineage.rst
+++ b/docs/apache-airflow/lineage.rst
@@ -40,20 +40,19 @@ works.
 
     FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
 
-    args = {
-        'owner': 'airflow',
-        'start_date': days_ago(2)
-    }
+    args = {"owner": "airflow", "start_date": days_ago(2)}
 
     dag = DAG(
-        dag_id='example_lineage', default_args=args,
-        schedule_interval='0 0 * * *',
-        dagrun_timeout=timedelta(minutes=60))
+        dag_id="example_lineage",
+        default_args=args,
+        schedule_interval="0 0 * * *",
+        dagrun_timeout=timedelta(minutes=60),
+    )
 
     f_final = File(url="/tmp/final")
-    run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
-        inlets=AUTO,
-        outlets=f_final)
+    run_this_last = DummyOperator(
+        task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final
+    )
 
     f_in = File(url="/tmp/whole_directory/")
     outlets = []
@@ -62,9 +61,7 @@ works.
         outlets.append(f_out)
 
     run_this = BashOperator(
-        task_id='run_me_first', bash_command='echo 1', dag=dag,
-        inlets=f_in,
-        outlets=outlets
+        task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets
     )
     run_this.set_downstream(run_this_last)
 
@@ -113,6 +110,8 @@ The backend should inherit from ``airflow.lineage.LineageBackend``.
 
   from airflow.lineage.backend import LineageBackend
 
+
   class ExampleBackend(LineageBackend):
-    def send_lineage(self, operator, inlets=None, outlets=None, context=None):
-      # Send the info to some external service
+      def send_lineage(self, operator, inlets=None, outlets=None, context=None):
+          ...
+          # Send the info to some external service
diff --git a/docs/apache-airflow/modules_management.rst b/docs/apache-airflow/modules_management.rst
index abe3cf0..bb71b6f 100644
--- a/docs/apache-airflow/modules_management.rst
+++ b/docs/apache-airflow/modules_management.rst
@@ -106,7 +106,7 @@ When we import this package, it should print the above message.
     import setuptools
 
     setuptools.setup(
-        name='airflow_operators',
+        name="airflow_operators",
     )
 
 5. Build the wheel:
diff --git a/docs/apache-airflow/plugins.rst b/docs/apache-airflow/plugins.rst
index 6f8f9a4..d59659e 100644
--- a/docs/apache-airflow/plugins.rst
+++ b/docs/apache-airflow/plugins.rst
@@ -122,8 +122,8 @@ looks like:
         #   to protect against extra parameters injected into the on_load(...)
         #   function in future changes
         def on_load(*args, **kwargs):
-           # ... perform Plugin boot actions
-           pass
+            # ... perform Plugin boot actions
+            pass
 
         # A list of global operator extra links that can redirect users to
         # external systems. These extra links will be available on the
@@ -133,7 +133,6 @@ looks like:
         # operator level.
         global_operator_extra_links = []
 
-
         # A list of operator extra links to override or add operator links
         # to existing Airflow Operators.
         # These extra links will be available on the task page in form of
@@ -172,17 +171,21 @@ definitions in Airflow.
     class PluginHook(BaseHook):
         pass
 
+
     # Will show up under airflow.macros.test_plugin.plugin_macro
     # and in templates through {{ macros.test_plugin.plugin_macro }}
     def plugin_macro():
         pass
 
+
     # Creating a flask blueprint to integrate the templates and static folder
     bp = Blueprint(
-        "test_plugin", __name__,
-        template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder
-        static_folder='static',
-        static_url_path='/static/test_plugin')
+        "test_plugin",
+        __name__,
+        template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
+        static_folder="static",
+        static_url_path="/static/test_plugin",
+    )
 
     # Creating a flask appbuilder BaseView
     class TestAppBuilderBaseView(AppBuilderBaseView):
@@ -192,6 +195,7 @@ definitions in Airflow.
         def test(self):
             return self.render_template("test_plugin/test.html", content="Hello galaxy!")
 
+
     # Creating a flask appbuilder BaseView
     class TestAppBuilderBaseNoMenuView(AppBuilderBaseView):
         default_view = "test"
@@ -200,15 +204,16 @@ definitions in Airflow.
         def test(self):
             return self.render_template("test_plugin/test.html", content="Hello galaxy!")
 
+
     v_appbuilder_view = TestAppBuilderBaseView()
-    v_appbuilder_package = {"name": "Test View",
-                            "category": "Test Plugin",
-                            "view": v_appbuilder_view}
+    v_appbuilder_package = {
+        "name": "Test View",
+        "category": "Test Plugin",
+        "view": v_appbuilder_view,
+    }
 
     v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView()
-    v_appbuilder_nomenu_package = {
-        "view": v_appbuilder_nomenu_view
-    }
+    v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view}
 
     # Creating flask appbuilder Menu Items
     appbuilder_mitem = {
@@ -229,16 +234,17 @@ definitions in Airflow.
         def get_link(self, operator, dttm):
             return "https://www.google.com"
 
+
     # A list of operator extra links to override or add operator links
     # to existing Airflow Operators.
     # These extra links will be available on the task page in form of
     # buttons.
     class S3LogLink(BaseOperatorLink):
-        name = 'S3'
+        name = "S3"
         operators = [GCSToS3Operator]
 
         def get_link(self, operator, dttm):
-            return 'https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}'.format(
+            return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}".format(
                 dag_id=operator.dag_id,
                 task_id=operator.task_id,
                 execution_date=dttm,
@@ -253,8 +259,12 @@ definitions in Airflow.
         flask_blueprints = [bp]
         appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package]
         appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]
-        global_operator_extra_links = [GoogleLink(),]
-        operator_extra_links = [S3LogLink(), ]
+        global_operator_extra_links = [
+            GoogleLink(),
+        ]
+        operator_extra_links = [
+            S3LogLink(),
+        ]
 
 
 Note on role based views
@@ -277,10 +287,11 @@ some views using a decorator.
 
     from airflow.www.app import csrf
 
+
     @csrf.exempt
     def my_handler():
         # ...
-        return 'ok'
+        return "ok"
 
 Plugins as Python packages
 --------------------------
@@ -302,14 +313,17 @@ will automatically load the registered plugins from the entrypoint list.
 
     # Creating a flask blueprint to integrate the templates and static folder
     bp = Blueprint(
-        "test_plugin", __name__,
-        template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder
-        static_folder='static',
-        static_url_path='/static/test_plugin')
+        "test_plugin",
+        __name__,
+        template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
+        static_folder="static",
+        static_url_path="/static/test_plugin",
+    )
+
 
     class MyAirflowPlugin(AirflowPlugin):
-      name = 'my_namespace'
-      flask_blueprints = [bp]
+        name = "my_namespace"
+        flask_blueprints = [bp]
 
 .. code-block:: python
 
@@ -317,12 +331,10 @@ will automatically load the registered plugins from the entrypoint list.
 
     setup(
         name="my-package",
-        ...
-        entry_points = {
-            'airflow.plugins': [
-                'my_plugin = my_package.my_plugin:MyAirflowPlugin'
-            ]
-        }
+        # ...
+        entry_points={
+            "airflow.plugins": ["my_plugin = my_package.my_plugin:MyAirflowPlugin"]
+        },
     )
 
 Automatic reloading webserver
diff --git a/docs/apache-airflow/security/secrets/fernet.rst b/docs/apache-airflow/security/secrets/fernet.rst
index 3ecbd0d..28c06fe 100644
--- a/docs/apache-airflow/security/secrets/fernet.rst
+++ b/docs/apache-airflow/security/secrets/fernet.rst
@@ -43,8 +43,9 @@ If you need to generate a new fernet key you can use the following code snippet.
     .. code-block:: python
 
       from cryptography.fernet import Fernet
-      fernet_key= Fernet.generate_key()
-      print(fernet_key.decode()) # your fernet_key, keep it in secured place!
+
+      fernet_key = Fernet.generate_key()
+      print(fernet_key.decode())  # your fernet_key, keep it in secured place!
 
 
 Rotating encryption keys
diff --git a/docs/apache-airflow/security/secrets/index.rst b/docs/apache-airflow/security/secrets/index.rst
index fd5bbd0..96d576b 100644
--- a/docs/apache-airflow/security/secrets/index.rst
+++ b/docs/apache-airflow/security/secrets/index.rst
@@ -78,6 +78,7 @@ your DAG file or operator's ``execute`` function using the ``mask_secret`` funct
     @task
     def my_func():
         from airflow.utils.log.secrets_masker import mask_secret
+
         mask_secret("custom_value")
 
         ...
@@ -88,9 +89,9 @@ or
 
 
     class MyOperator(BaseOperator):
-
         def execute(self, context):
             from airflow.utils.log.secrets_masker import mask_secret
+
             mask_secret("custom_value")
 
             ...
diff --git a/docs/apache-airflow/timezone.rst b/docs/apache-airflow/timezone.rst
index 63d1cec..449393c 100644
--- a/docs/apache-airflow/timezone.rst
+++ b/docs/apache-airflow/timezone.rst
@@ -75,7 +75,7 @@ Because Airflow uses time-zone-aware datetime objects. If your code creates date
     from airflow.utils import timezone
 
     now = timezone.utcnow()
-    a_date = timezone.datetime(2017,1,1)
+    a_date = timezone.datetime(2017, 1, 1)
 
 
 Interpretation of naive datetime objects
@@ -90,14 +90,11 @@ words if you have a default time zone setting of ``Europe/Amsterdam`` and create
 
 .. code-block:: python
 
-    default_args=dict(
-        start_date=datetime(2016, 1, 1),
-        owner='airflow'
-    )
+    default_args = dict(start_date=datetime(2016, 1, 1), owner="airflow")
 
-    dag = DAG('my_dag', default_args=default_args)
-    op = DummyOperator(task_id='dummy', dag=dag)
-    print(op.owner) # Airflow
+    dag = DAG("my_dag", default_args=default_args)
+    op = DummyOperator(task_id="dummy", dag=dag)
+    print(op.owner)  # Airflow
 
 Unfortunately, during DST transitions, some datetimes don’t exist or are ambiguous.
 In such situations, pendulum raises an exception. That’s why you should always create aware
@@ -137,14 +134,11 @@ using ``pendulum``.
 
     local_tz = pendulum.timezone("Europe/Amsterdam")
 
-    default_args=dict(
-        start_date=datetime(2016, 1, 1, tzinfo=local_tz),
-        owner='airflow'
-    )
+    default_args = dict(start_date=datetime(2016, 1, 1, tzinfo=local_tz), owner="airflow")
 
-    dag = DAG('my_tz_dag', default_args=default_args)
-    op = DummyOperator(task_id='dummy', dag=dag)
-    print(dag.timezone) # <Timezone [Europe/Amsterdam]>
+    dag = DAG("my_tz_dag", default_args=default_args)
+    op = DummyOperator(task_id="dummy", dag=dag)
+    print(dag.timezone)  # <Timezone [Europe/Amsterdam]>
 
 Please note that while it is possible to set a ``start_date`` and ``end_date`` for Tasks always the DAG timezone
 or global timezone (in that order) will be used to calculate the next execution date. Upon first encounter
diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial_taskflow_api.rst
index 39820b2..03da9e0 100644
--- a/docs/apache-airflow/tutorial_taskflow_api.rst
+++ b/docs/apache-airflow/tutorial_taskflow_api.rst
@@ -214,11 +214,11 @@ Building this dependency is shown in the code below:
         A simple Extract task to get data ready for the rest of the data
         pipeline, by reading the data from a file into a pandas dataframe
         """
-        order_data_file = '/tmp/order_data.csv'
+        order_data_file = "/tmp/order_data.csv"
         order_data_df = pd.read_csv(order_data_file)
 
 
-    file_task = FileSensor(task_id='check_file', filepath='/tmp/order_data.csv')
+    file_task = FileSensor(task_id="check_file", filepath="/tmp/order_data.csv")
     order_data = extract_from_file()
 
     file_task >> order_data
diff --git a/docs/apache-airflow/ui.rst b/docs/apache-airflow/ui.rst
index 728c2ae..26a0893 100644
--- a/docs/apache-airflow/ui.rst
+++ b/docs/apache-airflow/ui.rst
@@ -36,7 +36,7 @@ For example:
 
 .. code-block:: python
 
-   dag = DAG('dag', tags=['team1', 'sql'])
+   dag = DAG("dag", tags=["team1", "sql"])
 
 
 ------------
diff --git a/docs/apache-airflow/upgrading-to-2.rst b/docs/apache-airflow/upgrading-to-2.rst
index 96293ae..1473e4f 100644
--- a/docs/apache-airflow/upgrading-to-2.rst
+++ b/docs/apache-airflow/upgrading-to-2.rst
@@ -146,14 +146,14 @@ The behavior can be reverted when instantiating a DAG.
 
     import jinja2
 
-    dag = DAG('simple_dag', template_undefined=jinja2.Undefined)
+    dag = DAG("simple_dag", template_undefined=jinja2.Undefined)
 
 Alternatively, it is also possible to override each Jinja Template variable on an individual basis
 by using the ``| default`` Jinja filter as shown below.
 
 .. code-block:: python
 
-    {{ a | default(1) }}
+    {{a | default(1)}}
 
 
 
@@ -174,23 +174,18 @@ Whereas previously a user would import each individual class to build the pod as
     from airflow.kubernetes.volume_mount import VolumeMount
 
 
-    volume_config = {
-        'persistentVolumeClaim': {
-            'claimName': 'test-volume'
-        }
-    }
-    volume = Volume(name='test-volume', configs=volume_config)
-    volume_mount = VolumeMount('test-volume',
-                               mount_path='/root/mount_file',
-                               sub_path=None,
-                               read_only=True)
+    volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
+    volume = Volume(name="test-volume", configs=volume_config)
+    volume_mount = VolumeMount(
+        "test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
+    )
 
-    port = Port('http', 80)
-    secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
-    secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
+    port = Port("http", 80)
+    secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
+    secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
 
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo", "10"],
@@ -219,23 +214,25 @@ Now the user can use the ``kubernetes.client.models`` class as a single point of
     from airflow.kubernetes.secret import Secret
 
 
-    configmaps = ['test-configmap-1', 'test-configmap-2']
+    configmaps = ["test-configmap-1", "test-configmap-2"]
 
     volume = k8s.V1Volume(
-        name='test-volume',
-        persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
+        name="test-volume",
+        persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
+            claim_name="test-volume"
+        ),
     )
 
-    port = k8s.V1ContainerPort(name='http', container_port=80)
-    secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
-    secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
-    secret_all_keys = Secret('env', None, 'airflow-secrets-2')
+    port = k8s.V1ContainerPort(name="http", container_port=80)
+    secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
+    secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
+    secret_all_keys = Secret("env", None, "airflow-secrets-2")
     volume_mount = k8s.V1VolumeMount(
-        name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True
+        name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
     )
 
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo", "10"],
@@ -247,7 +244,8 @@ Now the user can use the ``kubernetes.client.models`` class as a single point of
         name="airflow-test-pod",
         task_id="task",
         is_delete_operator_pod=True,
-        hostnetwork=False)
+        hostnetwork=False,
+    )
 
 
 We decided to keep the Secret class as users seem to really like that simplifies the complexity of mounting
@@ -445,9 +443,9 @@ While in the deprecated version a user would mount a volume using the following
                         "mountPath": "/foo/",
                         "name": "example-kubernetes-test-volume",
                     },
-                ]
+                ],
             }
-        }
+        },
     )
 
 In the new model a user can accomplish the same thing using the following code under the ``pod_override`` key:
@@ -459,30 +457,29 @@ In the new model a user can accomplish the same thing using the following code u
     second_task = PythonOperator(
         task_id="four_task",
         python_callable=test_volume_mount,
-        executor_config={"pod_override": k8s.V1Pod(
-            spec=k8s.V1PodSpec(
-                containers=[
-                    k8s.V1Container(
-                        name="base",
-                        volume_mounts=[
-                            k8s.V1VolumeMount(
-                                mount_path="/foo/",
-                                name="example-kubernetes-test-volume"
-                            )
-                        ]
-                    )
-                ],
-                volumes=[
-                    k8s.V1Volume(
-                        name="example-kubernetes-test-volume",
-                        host_path=k8s.V1HostPathVolumeSource(
-                            path="/tmp/"
+        executor_config={
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[
+                                k8s.V1VolumeMount(
+                                    mount_path="/foo/",
+                                    name="example-kubernetes-test-volume",
+                                )
+                            ],
+                        )
+                    ],
+                    volumes=[
+                        k8s.V1Volume(
+                            name="example-kubernetes-test-volume",
+                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                         )
-                    )
-                ]
+                    ],
+                )
             )
-        )
-        }
+        },
     )
 
 For Airflow 2.0, the traditional ``executor_config`` will continue operation with a deprecation warning,
@@ -551,9 +548,10 @@ Before:
 .. code-block:: python
 
     from airflow.kubernetes.pod import Port
-    port = Port('http', 80)
+
+    port = Port("http", 80)
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -566,9 +564,10 @@ After:
 .. code-block:: python
 
     from kubernetes.client import models as k8s
-    port = k8s.V1ContainerPort(name='http', container_port=80)
+
+    port = k8s.V1ContainerPort(name="http", container_port=80)
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -583,12 +582,12 @@ Before:
 .. code-block:: python
 
     from airflow.kubernetes.volume_mount import VolumeMount
-    volume_mount = VolumeMount('test-volume',
-                               mount_path='/root/mount_file',
-                               sub_path=None,
-                               read_only=True)
+
+    volume_mount = VolumeMount(
+        "test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
+    )
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -601,11 +600,12 @@ After:
 .. code-block:: python
 
     from kubernetes.client import models as k8s
+
     volume_mount = k8s.V1VolumeMount(
-        name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True
+        name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
     )
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -622,14 +622,10 @@ Before:
 
     from airflow.kubernetes.volume import Volume
 
-    volume_config = {
-        'persistentVolumeClaim': {
-            'claimName': 'test-volume'
-    }
-    }
-    volume = Volume(name='test-volume', configs=volume_config)
+    volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
+    volume = Volume(name="test-volume", configs=volume_config)
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -642,12 +638,15 @@ After:
 .. code-block:: python
 
     from kubernetes.client import models as k8s
+
     volume = k8s.V1Volume(
-        name='test-volume',
-        persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
+        name="test-volume",
+        persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
+            claim_name="test-volume"
+        ),
     )
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -662,7 +661,7 @@ Before:
 .. code-block:: python
 
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -677,17 +676,12 @@ After:
     from kubernetes.client import models as k8s
 
     env_vars = [
-        k8s.V1EnvVar(
-            name="ENV1",
-            value="val1"
-        ),
-        k8s.V1EnvVar(
-            name="ENV2",
-            value="val2"
-        )]
+        k8s.V1EnvVar(name="ENV1", value="val1"),
+        k8s.V1EnvVar(name="ENV2", value="val2"),
+    ]
 
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -707,7 +701,7 @@ Before:
     from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
 
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -725,15 +719,13 @@ After:
         k8s.V1EnvVar(
             name="ENV3",
             value_from=k8s.V1EnvVarSource(
-                field_ref=k8s.V1ObjectFieldSelector(
-                    field_path="status.podIP"
-                )
-            )
+                field_ref=k8s.V1ObjectFieldSelector(field_path="status.podIP")
+            ),
         )
     ]
 
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -750,12 +742,12 @@ Before:
 .. code-block:: python
 
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
-        configmaps=['test-configmap'],
-        task_id="task"
+        configmaps=["test-configmap"],
+        task_id="task",
     )
 
 
@@ -765,20 +757,18 @@ After:
 
     from kubernetes.client import models as k8s
 
-    configmap ="test-configmap"
-    env_from = [k8s.V1EnvFromSource(
-                    config_map_ref=k8s.V1ConfigMapEnvSource(
-                        name=configmap
-                    )
-                )]
+    configmap = "test-configmap"
+    env_from = [
+        k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap))
+    ]
 
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
         env_from=env_from,
-        task_id="task"
+        task_id="task",
     )
 
 
@@ -789,15 +779,15 @@ Before:
 .. code-block:: python
 
     resources = {
-        'limit_cpu': 0.25,
-        'limit_memory': '64Mi',
-        'limit_ephemeral_storage': '2Gi',
-        'request_cpu': '250m',
-        'request_memory': '64Mi',
-        'request_ephemeral_storage': '1Gi',
+        "limit_cpu": 0.25,
+        "limit_memory": "64Mi",
+        "limit_ephemeral_storage": "2Gi",
+        "request_cpu": "250m",
+        "request_memory": "64Mi",
+        "request_ephemeral_storage": "1Gi",
     }
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -815,21 +805,17 @@ After:
 
     from kubernetes.client import models as k8s
 
-    resources=k8s.V1ResourceRequirements(
-        requests={
-            'memory': '64Mi',
-            'cpu': '250m',
-            'ephemeral-storage': '1Gi'
-        },
+    resources = k8s.V1ResourceRequirements(
+        requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
         limits={
-            'memory': '64Mi',
-            'cpu': 0.25,
-            'nvidia.com/gpu': None,
-            'ephemeral-storage': '2Gi'
-        }
+            "memory": "64Mi",
+            "cpu": 0.25,
+            "nvidia.com/gpu": None,
+            "ephemeral-storage": "2Gi",
+        },
     )
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
@@ -849,14 +835,14 @@ Before:
 .. code-block:: python
 
     k = KubernetesPodOperator(
-        namespace='default',
+        namespace="default",
         image="ubuntu:16.04",
         cmds=["bash", "-cx"],
         arguments=["echo 10"],
         name="test",
         task_id="task",
         image_pull_secrets="fake-secret",
-        cluster_context='default'
+        cluster_context="default",
     )
 
 After:
@@ -864,9 +850,9 @@ After:
 .. code-block:: python
 
     quay_k8s = KubernetesPodOperator(
-        namespace='default',
-        image='quay.io/apache/bash',
-        image_pull_secrets=[k8s.V1LocalObjectReference('testquay')],
+        namespace="default",
+        image="quay.io/apache/bash",
+        image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
         cmds=["bash", "-cx"],
         name="airflow-private-image-pod",
         task_id="task-two",
@@ -1136,19 +1122,19 @@ non-RBAC UI (``flask-admin`` based UI), update it to use ``flask_appbuilder_view
 
 
     class TestView(BaseView):
-        @expose('/')
+        @expose("/")
         def test(self):
             # in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
             return self.render("test_plugin/test.html", content="Hello galaxy!")
 
+
     v = TestView(category="Test Plugin", name="Test View")
 
     ml = MenuLink(
-        category='Test Plugin',
-        name='Test Menu Link',
-        url='https://airflow.apache.org/'
+        category="Test Plugin", name="Test Menu Link", url="https://airflow.apache.org/"
     )
 
+
     class AirflowTestPlugin(AirflowPlugin):
         admin_views = [v]
         menu_links = [ml]
@@ -1161,6 +1147,7 @@ non-RBAC UI (``flask-admin`` based UI), update it to use ``flask_appbuilder_view
     from airflow.plugins_manager import AirflowPlugin
     from flask_appbuilder import expose, BaseView as AppBuilderBaseView
 
+
     class TestAppBuilderBaseView(AppBuilderBaseView):
         default_view = "test"
 
@@ -1168,16 +1155,21 @@ non-RBAC UI (``flask-admin`` based UI), update it to use ``flask_appbuilder_view
         def test(self):
             return self.render_template("test_plugin/test.html", content="Hello galaxy!")
 
+
     v_appbuilder_view = TestAppBuilderBaseView()
-    v_appbuilder_package = {"name": "Test View",
-                            "category": "Test Plugin",
-                            "view": v_appbuilder_view}
+    v_appbuilder_package = {
+        "name": "Test View",
+        "category": "Test Plugin",
+        "view": v_appbuilder_view,
+    }
 
     # Creating a flask appbuilder Menu Item
-    appbuilder_mitem = {"name": "Google",
-                        "category": "Search",
-                        "category_icon": "fa-th",
-                        "href": "https://www.google.com"}
+    appbuilder_mitem = {
+        "name": "Google",
+        "category": "Search",
+        "category_icon": "fa-th",
+        "href": "https://www.google.com",
+    }
 
 
     # Defining the plugin class
diff --git a/tests/dags_corrupted/README.md b/tests/dags_corrupted/README.md
index 2cf2aaa..5c3a027 100644
--- a/tests/dags_corrupted/README.md
+++ b/tests/dags_corrupted/README.md
@@ -26,7 +26,8 @@ To access a DAG in this folder, use the following code inside a unit test.
 
 ```python
 TEST_DAG_FOLDER = os.path.join(
-    os.path.dirname(os.path.realpath(__file__)), 'dags_corrupted')
+    os.path.dirname(os.path.realpath(__file__)), "dags_corrupted"
+)
 
 dagbag = DagBag(dag_folder=TEST_DAG_FOLDER)
 dag = dagbag.get_dag(dag_id)
diff --git a/tests/test_utils/perf/perf_kit/__init__.py b/tests/test_utils/perf/perf_kit/__init__.py
index 25583fc..f9b0fe2 100644
--- a/tests/test_utils/perf/perf_kit/__init__.py
+++ b/tests/test_utils/perf/perf_kit/__init__.py
@@ -66,16 +66,18 @@ Suppose we have the following fragment of the file with tests.
 
 .. code-block:: python
 
-        prev = dag.previous_schedule(_next)
-        prev_local = local_tz.convert(prev)
+    prev = dag.previous_schedule(_next)
+    prev_local = local_tz.convert(prev)
+
+    assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00"
+    assert prev.isoformat() == "2018-03-24T02:00:00+00:00"
 
-        assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00"
-        assert prev.isoformat() == "2018-03-24T02:00:00+00:00"
 
     def test_bulk_write_to_db(self):
         clear_db_dags()
         dags = [
-            DAG(f'dag-bulk-sync-{i}', start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(0, 4)
+            DAG(f"dag-bulk-sync-{i}", start_date=DEFAULT_DATE, tags=["test-dag"])
+            for i in range(0, 4)
         ]
 
         with assert_queries_count(3):
@@ -87,19 +89,21 @@ queries in it.
 .. code-block:: python
    :emphasize-lines: 6-8
 
-        prev = dag.previous_schedule(_next)
-        prev_local = local_tz.convert(prev)
+    prev = dag.previous_schedule(_next)
+    prev_local = local_tz.convert(prev)
 
-        assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00"
-        assert prev.isoformat() == "2018-03-24T02:00:00+00:00"
+    assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00"
+    assert prev.isoformat() == "2018-03-24T02:00:00+00:00"
 
     from tests.utils.perf.perf_kit.sqlalchemy import trace_queries
 
+
     @trace_queries
     def test_bulk_write_to_db(self):
         clear_db_dags()
         dags = [
-            DAG(f'dag-bulk-sync-{i}', start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(0, 4)
+            DAG(f"dag-bulk-sync-{i}", start_date=DEFAULT_DATE, tags=["test-dag"])
+            for i in range(0, 4)
         ]
 
         with assert_queries_count(3):