You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/03/04 16:43:45 UTC

[airflow] branch main updated: Upgrade FAB to 4.3.0 (#29766)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b7f05008ae Upgrade FAB to 4.3.0 (#29766)
b7f05008ae is described below

commit b7f05008ae45fc208456f7f64c19d08ad1cf7313
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sat Mar 4 17:43:35 2023 +0100

    Upgrade FAB to 4.3.0 (#29766)
    
    FAB 4.3.0 added rate limiting and we would like to upgrade to this
    version.
    
    This requires to bring some of the changes from the PRs merged in
    Flask App Builder:
    
    * https://github.com/dpgaspar/Flask-AppBuilder/pull/1976
    * https://github.com/dpgaspar/Flask-AppBuilder/pull/1997
    * https://github.com/dpgaspar/Flask-AppBuilder/pull/1999
    
    While Flask App Builder disabled rate limitig by default, Airlfow
    is "end product" using it and we decided to enable it.
---
 airflow/config_templates/config.yml                | 14 +++++
 airflow/config_templates/default_airflow.cfg       |  6 ++
 airflow/www/extensions/init_appbuilder.py          | 23 ++++++++
 airflow/www/fab_security/manager.py                | 47 ++++++++++++++-
 .../security/webserver.rst                         | 21 +++++++
 docs/apache-airflow/howto/set-config.rst           | 12 ++++
 setup.cfg                                          |  2 +-
 setup.py                                           |  2 +-
 tests/conftest.py                                  |  7 ++-
 tests/test_utils/www.py                            |  4 +-
 tests/www/views/conftest.py                        |  5 +-
 tests/www/views/test_views_log.py                  |  7 ++-
 tests/www/views/test_views_rate_limit.py           | 69 ++++++++++++++++++++++
 13 files changed, 210 insertions(+), 9 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index eb87263399..c9d0bf92ac 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1600,6 +1600,20 @@ webserver:
       type: boolean
       example: ~
       default: "False"
+    auth_rate_limited:
+      description: |
+        Boolean for enabling rate limiting on authentication endpoints.
+      version_added: 2.6.0
+      type: boolean
+      example: ~
+      default: "True"
+    auth_rate_limit:
+      description: |
+          Rate limit for authentication endpoints.
+      version_added: 2.6.0
+      type: string
+      example: ~
+      default: "5 per 40 second"
 
 email:
   description: |
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 572e5326c5..a8e8266a85 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -824,6 +824,12 @@ enable_swagger_ui = True
 # Boolean for running Internal API in the webserver.
 run_internal_api = False
 
+# Boolean for enabling rate limiting on authentication endpoints.
+auth_rate_limited = True
+
+# Rate limit for authentication endpoints.
+auth_rate_limit = 5 per 40 second
+
 [email]
 
 # Configuration email backend and whether to
diff --git a/airflow/www/extensions/init_appbuilder.py b/airflow/www/extensions/init_appbuilder.py
index fe1ae0938d..dbd2d93938 100644
--- a/airflow/www/extensions/init_appbuilder.py
+++ b/airflow/www/extensions/init_appbuilder.py
@@ -125,6 +125,8 @@ class AirflowAppBuilder:
         static_url_path="/appbuilder",
         security_manager_class=None,
         update_perms=conf.getboolean("webserver", "UPDATE_FAB_PERMS"),
+        auth_rate_limited=conf.getboolean("webserver", "AUTH_RATE_LIMITED", fallback=True),
+        auth_rate_limit=conf.get("webserver", "AUTH_RATE_LIMIT", fallback="5 per 40 second"),
     ):
         """
         App-builder constructor.
@@ -146,6 +148,10 @@ class AirflowAppBuilder:
         :param update_perms:
             optional, update permissions flag (Boolean) you can use
             FAB_UPDATE_PERMS config key also
+        :param auth_rate_limited:
+            optional, rate limit authentication attempts if set to True (defaults to True)
+        :param auth_rate_limit:
+            optional, rate limit authentication attempts configuration (defaults "to 5 per 40 second")
         """
         self.baseviews = []
         self._addon_managers = []
@@ -158,6 +164,8 @@ class AirflowAppBuilder:
         self.static_url_path = static_url_path
         self.app = app
         self.update_perms = update_perms
+        self.auth_rate_limited = auth_rate_limited
+        self.auth_rate_limit = auth_rate_limit
         if app is not None:
             self.init_app(app, session)
 
@@ -172,10 +180,13 @@ class AirflowAppBuilder:
         app.config.setdefault("APP_ICON", "")
         app.config.setdefault("LANGUAGES", {"en": {"flag": "gb", "name": "English"}})
         app.config.setdefault("ADDON_MANAGERS", [])
+        app.config.setdefault("RATELIMIT_ENABLED", self.auth_rate_limited)
         app.config.setdefault("FAB_API_MAX_PAGE_SIZE", 100)
         app.config.setdefault("FAB_BASE_TEMPLATE", self.base_template)
         app.config.setdefault("FAB_STATIC_FOLDER", self.static_folder)
         app.config.setdefault("FAB_STATIC_URL_PATH", self.static_url_path)
+        app.config.setdefault("AUTH_RATE_LIMITED", self.auth_rate_limited)
+        app.config.setdefault("AUTH_RATE_LIMIT", self.auth_rate_limit)
 
         self.app = app
 
@@ -433,6 +444,7 @@ class AirflowAppBuilder:
             if self.app:
                 self.register_blueprint(baseview)
                 self._add_permission(baseview)
+                self.add_limits(baseview)
         self.add_link(
             name=name,
             href=href,
@@ -563,6 +575,11 @@ class AirflowAppBuilder:
         """
         return self.sm.security_converge(self.baseviews, self.menu, dry)
 
+    def get_url_for_login_with(self, next_url: str | None = None) -> str:
+        if self.sm.auth_view is None:
+            return ""
+        return url_for(f"{self.sm.auth_view.endpoint}.{'login'}", next=next_url)
+
     @property
     def get_url_for_login(self):
         return url_for(f"{self.sm.auth_view.endpoint}.login")
@@ -585,6 +602,10 @@ class AirflowAppBuilder:
             locale=lang,
         )
 
+    def add_limits(self, baseview) -> None:
+        if hasattr(baseview, "limits"):
+            self.sm.add_limit_view(baseview)
+
     def add_permissions(self, update_perms=False):
         if self.update_perms or update_perms:
             for baseview in self.baseviews:
@@ -653,4 +674,6 @@ def init_appbuilder(app) -> AirflowAppBuilder:
         security_manager_class=security_manager_class,
         base_template="airflow/main.html",
         update_perms=conf.getboolean("webserver", "UPDATE_FAB_PERMS"),
+        auth_rate_limited=conf.getboolean("webserver", "AUTH_RATE_LIMITED", fallback=True),
+        auth_rate_limit=conf.get("webserver", "AUTH_RATE_LIMIT", fallback="5 per 40 second"),
     )
diff --git a/airflow/www/fab_security/manager.py b/airflow/www/fab_security/manager.py
index a23315b814..8f0363186b 100644
--- a/airflow/www/fab_security/manager.py
+++ b/airflow/www/fab_security/manager.py
@@ -26,7 +26,7 @@ import re
 from typing import Any
 from uuid import uuid4
 
-from flask import current_app, g, session, url_for
+from flask import Flask, current_app, g, session, url_for
 from flask_appbuilder import AppBuilder
 from flask_appbuilder.const import (
     AUTH_DB,
@@ -67,6 +67,8 @@ from flask_appbuilder.security.views import (
 )
 from flask_babel import lazy_gettext as _
 from flask_jwt_extended import JWTManager, current_user as current_user_jwt
+from flask_limiter import Limiter
+from flask_limiter.util import get_remote_address
 from flask_login import AnonymousUserMixin, LoginManager, current_user
 from werkzeug.security import check_password_hash, generate_password_hash
 
@@ -252,6 +254,10 @@ class BaseSecurityManager:
             app.config.setdefault("AUTH_LDAP_LASTNAME_FIELD", "sn")
             app.config.setdefault("AUTH_LDAP_EMAIL_FIELD", "mail")
 
+        # Rate limiting
+        app.config.setdefault("AUTH_RATE_LIMITED", True)
+        app.config.setdefault("AUTH_RATE_LIMIT", "5 per 40 second")
+
         if self.auth_type == AUTH_OID:
             from flask_openid import OpenID
 
@@ -280,6 +286,14 @@ class BaseSecurityManager:
         # Setup Flask-Jwt-Extended
         self.jwt_manager = self.create_jwt_manager(app)
 
+        # Setup Flask-Limiter
+        self.limiter = self.create_limiter(app)
+
+    def create_limiter(self, app: Flask) -> Limiter:
+        limiter = Limiter(key_func=get_remote_address)
+        limiter.init_app(app)
+        return limiter
+
     def create_login_manager(self, app) -> LoginManager:
         """
         Override to implement your custom login manager instance
@@ -514,6 +528,14 @@ class BaseSecurityManager:
         """Oauth providers"""
         return self.appbuilder.get_app.config["OAUTH_PROVIDERS"]
 
+    @property
+    def is_auth_limited(self) -> bool:
+        return self.appbuilder.get_app.config["AUTH_RATE_LIMITED"]
+
+    @property
+    def auth_rate_limit(self) -> str:
+        return self.appbuilder.get_app.config["AUTH_RATE_LIMIT"]
+
     @property
     def current_user(self):
         """Current user object"""
@@ -750,6 +772,11 @@ class BaseSecurityManager:
 
         self.appbuilder.add_view_no_menu(self.auth_view)
 
+        # this needs to be done after the view is added, otherwise the blueprint
+        # is not initialized
+        if self.is_auth_limited:
+            self.limiter.limit(self.auth_rate_limit, methods=["POST"])(self.auth_view.blueprint)
+
         self.user_view = self.appbuilder.add_view(
             self.user_view,
             "List Users",
@@ -1397,6 +1424,24 @@ class BaseSecurityManager:
         else:
             return self._get_user_permission_resources(None, "menu_access", resource_names=menu_names)
 
+    def add_limit_view(self, baseview):
+        if not baseview.limits:
+            return
+
+        for limit in baseview.limits:
+            self.limiter.limit(
+                limit_value=limit.limit_value,
+                key_func=limit.key_func,
+                per_method=limit.per_method,
+                methods=limit.methods,
+                error_message=limit.error_message,
+                exempt_when=limit.exempt_when,
+                override_defaults=limit.override_defaults,
+                deduct_when=limit.deduct_when,
+                on_breach=limit.on_breach,
+                cost=limit.cost,
+            )(baseview.blueprint)
+
     def add_permissions_view(self, base_action_names, resource_name):  # Keep name for compatibility with FAB.
         """
         Adds an action on a resource to the backend
diff --git a/docs/apache-airflow/administration-and-deployment/security/webserver.rst b/docs/apache-airflow/administration-and-deployment/security/webserver.rst
index 7e3cf3b6df..d5e551f8b1 100644
--- a/docs/apache-airflow/administration-and-deployment/security/webserver.rst
+++ b/docs/apache-airflow/administration-and-deployment/security/webserver.rst
@@ -263,3 +263,24 @@ certs and keys.
     ssl_key = <path to key>
     ssl_cert = <path to cert>
     ssl_cacert = <path to cacert>
+
+Rate limiting
+-------------
+
+Airflow can be configured to limit the number of authentication requests in a given time window. We are using
+`Flask-Limiter <https://flask-limiter.readthedocs.io/en/stable/>`_ to achieve that and by default Airflow
+uses per-webserver default limit of 5 requests per 40 second fixed window. By default no common storage for
+rate limits is used between the gunicorn processes you run so rate-limit is applied separately for each process,
+so assuming random distribution of the requests by gunicorn with single webserver instance and default 4
+gunicorn workers, the effective rate limit is 5 x 4 = 20 requests per 40 second window (more or less).
+However you can configure the rate limit to be shared between the processes by using rate limit storage via
+setting the ``RATELIMIT_*`` configuration settings in ``webserver_config.py``.
+For example, to use Redis as a rate limit storage you can use the following configuration (you need
+to set ``redis_host`` to your Redis instance)
+
+```
+RATELIMIT_STORAGE_URI = 'redis://redis_host:6379/0
+```
+
+You can also configure other rate limit settings in ``webserver_config.py`` - for more details, see the
+`Flask Limiter rate limit configuration <https://flask-limiter.readthedocs.io/en/stable/configuration.html>`_.
diff --git a/docs/apache-airflow/howto/set-config.rst b/docs/apache-airflow/howto/set-config.rst
index 0f7cf58504..02fa9bac2d 100644
--- a/docs/apache-airflow/howto/set-config.rst
+++ b/docs/apache-airflow/howto/set-config.rst
@@ -130,3 +130,15 @@ the example below.
     generated using the secret key has a short expiry time though - make sure that time on ALL the machines
     that you run airflow components on is synchronized (for example using ntpd) otherwise you might get
     "forbidden" errors when the logs are accessed.
+
+
+Configuring Flask Application for Airflow Webserver
+===================================================
+
+Airflow uses Flask to render the web UI. When you initialize the Airflow webserver, predefined configuration
+is used, based on the ``webserver`` section of the ``airflow.cfg`` file. You can override these settings
+and add any extra settings however by adding flask configuration to ``webserver_config.py`` file in your
+``$AIRFLOW_HOME`` directory. This file is automatically loaded by the webserver.
+
+For example if you would like to change rate limit strategy to "moving window", you can set the
+``RATELIMIT_STRATEGY`` to ``moving-window``.
diff --git a/setup.cfg b/setup.cfg
index c4cb3f0809..8c52f566cf 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -93,7 +93,7 @@ install_requires =
     # `airflow/www/fab_security` with their upstream counterparts. In particular, make sure any breaking changes,
     # for example any new methods, are accounted for.
     # NOTE! When you change the value here, you also have to update flask-appbuilder[oauth] in setup.py
-    flask-appbuilder==4.1.4
+    flask-appbuilder==4.3.0
     flask-caching>=1.5.0
     flask-login>=0.6.2
     flask-session>=0.4.0
diff --git a/setup.py b/setup.py
index 24859ea2ce..f29a923c76 100644
--- a/setup.py
+++ b/setup.py
@@ -278,7 +278,7 @@ doc_gen = [
 flask_appbuilder_oauth = [
     "authlib>=1.0.0",
     # The version here should be upgraded at the same time as flask-appbuilder in setup.cfg
-    "flask-appbuilder[oauth]==4.1.4",
+    "flask-appbuilder[oauth]==4.3.0",
 ]
 kerberos = [
     "pykerberos>=1.1.13",
diff --git a/tests/conftest.py b/tests/conftest.py
index 680bee6f6c..0b4c2931fc 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -447,9 +447,12 @@ def frozen_sleep(monkeypatch):
 
 @pytest.fixture(scope="session")
 def app():
-    from airflow.www import app
+    from tests.test_utils.config import conf_vars
 
-    return app.create_app(testing=True)
+    with conf_vars({("webserver", "auth_rate_limited"): "False"}):
+        from airflow.www import app
+
+        yield app.create_app(testing=True)
 
 
 @pytest.fixture
diff --git a/tests/test_utils/www.py b/tests/test_utils/www.py
index d46e3f69ee..ea0295af2c 100644
--- a/tests/test_utils/www.py
+++ b/tests/test_utils/www.py
@@ -22,13 +22,13 @@ from unittest import mock
 from airflow.models import Log
 
 
-def client_with_login(app, **kwargs):
+def client_with_login(app, expected_response_code=302, **kwargs):
     patch_path = "airflow.www.fab_security.manager.check_password_hash"
     with mock.patch(patch_path) as check_password_hash:
         check_password_hash.return_value = True
         client = app.test_client()
         resp = client.post("/login/", data=kwargs)
-        assert resp.status_code == 302
+        assert resp.status_code == expected_response_code
     return client
 
 
diff --git a/tests/www/views/conftest.py b/tests/www/views/conftest.py
index e04b5ec96e..209a106df6 100644
--- a/tests/www/views/conftest.py
+++ b/tests/www/views/conftest.py
@@ -28,6 +28,7 @@ from airflow import settings
 from airflow.models import DagBag
 from airflow.www.app import create_app
 from tests.test_utils.api_connexion_utils import delete_user
+from tests.test_utils.config import conf_vars
 from tests.test_utils.decorators import dont_initialize_flask_app_submodules
 from tests.test_utils.www import client_with_login, client_without_login
 
@@ -62,7 +63,8 @@ def app(examples_dag_bag):
         ]
     )
     def factory():
-        return create_app(testing=True)
+        with conf_vars({("webserver", "auth_rate_limited"): "False"}):
+            return create_app(testing=True)
 
     app = factory()
     app.config["WTF_CSRF_ENABLED"] = False
@@ -110,6 +112,7 @@ def app(examples_dag_bag):
 
 @pytest.fixture()
 def admin_client(app):
+
     return client_with_login(app, username="test_admin", password="test_admin")
 
 
diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py
index 6939ed9dca..207e9a3d89 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -73,7 +73,12 @@ def log_app(backup_modules, log_path):
             "init_api_connexion",
         ]
     )
-    @conf_vars({("logging", "logging_config_class"): "airflow_local_settings.LOGGING_CONFIG"})
+    @conf_vars(
+        {
+            ("logging", "logging_config_class"): "airflow_local_settings.LOGGING_CONFIG",
+            ("webserver", "auth_rate_limited"): "False",
+        }
+    )
     def factory():
         app = create_app(testing=True)
         app.config["WTF_CSRF_ENABLED"] = False
diff --git a/tests/www/views/test_views_rate_limit.py b/tests/www/views/test_views_rate_limit.py
new file mode 100644
index 0000000000..150d38e59e
--- /dev/null
+++ b/tests/www/views/test_views_rate_limit.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.www.app import create_app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.decorators import dont_initialize_flask_app_submodules
+from tests.test_utils.www import client_with_login
+
+
+@pytest.fixture()
+def app_with_rate_limit_one(examples_dag_bag):
+    @dont_initialize_flask_app_submodules(
+        skip_all_except=[
+            "init_api_connexion",
+            "init_appbuilder",
+            "init_appbuilder_links",
+            "init_appbuilder_views",
+            "init_flash_views",
+            "init_jinja_globals",
+            "init_plugins",
+            "init_airflow_session_interface",
+            "init_check_user_active",
+        ]
+    )
+    def factory():
+        with conf_vars(
+            {("webserver", "auth_rate_limited"): "True", ("webserver", "auth_rate_limit"): "1 per 20 second"}
+        ):
+            return create_app(testing=True)
+
+    app = factory()
+    app.config["WTF_CSRF_ENABLED"] = False
+    return app
+
+
+def test_rate_limit_one(app_with_rate_limit_one):
+    client_with_login(
+        app_with_rate_limit_one, expected_response_code=302, username="test_admin", password="test_admin"
+    )
+    client_with_login(
+        app_with_rate_limit_one, expected_response_code=429, username="test_admin", password="test_admin"
+    )
+    client_with_login(
+        app_with_rate_limit_one, expected_response_code=429, username="test_admin", password="test_admin"
+    )
+
+
+def test_rate_limit_disabled(app):
+    client_with_login(app, expected_response_code=302, username="test_admin", password="test_admin")
+    client_with_login(app, expected_response_code=302, username="test_admin", password="test_admin")
+    client_with_login(app, expected_response_code=302, username="test_admin", password="test_admin")