You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/26 15:09:09 UTC

[airflow] branch v2-4-test updated (d005faf758 -> b68fb0960f)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a change to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


    from d005faf758 Add `cleartext` to spelling wordlist (#26447)
     new 5cd93d541e Doc: Fix typos in ``example_branch_datetime_operator`` (#26455)
     new 97c5124b16 Don't import kubernetes unless you have a V1Pod (#26496)
     new 2598efca1d Clarify owner links document (#26515)
     new fc9b588e3b Fix typo in release notes (#26530)
     new 8a9f0afe39 Fix grid view log try numbers (#26556)
     new 0308b177fc Log warning when secret backend kwargs is invalid (#26580)
     new e436cb95f4 Resolve warning about DISTINCT ON query on dags view (#26608)
     new 1a38896add Allow MapXComArg to resolve after serialization  (#26591)
     new 493ddb2ea9 No missing user warning for public admin (#26611)
     new a6bc5d0eb1 Use COALESCE when ordering runs to handle NULL (#26626)
     new 85895b567f Check user is active (#26635)
     new 237f2240b0 Fix xcom arg.py .zip bug (#26636)
     new b68fb0960f Add requirements.txt example to "extending docker" (#26663)

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 RELEASE_NOTES.rst                                  |  2 +-
 airflow/configuration.py                           | 24 ++++++---
 .../example_branch_datetime_operator.py            |  6 +--
 airflow/models/xcom_arg.py                         | 60 +++++++++++++++++-----
 airflow/serialization/serialized_objects.py        |  2 +-
 airflow/www/app.py                                 |  7 ++-
 airflow/www/extensions/init_security.py            | 11 ++++
 airflow/www/fab_security/manager.py                |  2 +-
 .../dag/details/taskInstance/Logs/index.test.tsx   | 16 +++---
 .../js/dag/details/taskInstance/Logs/index.tsx     | 28 +++++-----
 airflow/www/utils.py                               | 44 +++++++++++++---
 airflow/www/views.py                               |  7 +--
 docs/apache-airflow/howto/add-owner-links.rst      | 29 ++++++-----
 docs/docker-stack/build.rst                        | 34 ++++++++++--
 .../Dockerfile                                     |  5 +-
 .../add-requirement-packages/requirements.txt      |  2 +
 tests/models/test_xcom_arg.py                      |  4 +-
 tests/test_utils/decorators.py                     |  1 +
 tests/www/views/conftest.py                        |  1 +
 tests/www/views/test_session.py                    | 14 +++++
 tests/www/views/test_views_base.py                 | 13 ++++-
 21 files changed, 229 insertions(+), 83 deletions(-)
 copy docs/docker-stack/docker-examples/extending/{custom-providers => add-requirement-packages}/Dockerfile (89%)
 create mode 100644 docs/docker-stack/docker-examples/extending/add-requirement-packages/requirements.txt


[airflow] 08/13: Allow MapXComArg to resolve after serialization (#26591)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1a38896add94d0caa535358b67b5031609c65451
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Sat Sep 24 04:33:01 2022 +0800

    Allow MapXComArg to resolve after serialization  (#26591)
    
    This is useful for cases where we want to resolve an XCom without
    running a worker, e.g. to display the value in UI.
    
    Since we don't want to actually call the mapper function in this case
    (the function is arbitrary code, and not running it is the entire point
    to serialize operators), "resolving" the XComArg in this case would
    merely produce some kind of quasi-meaningful string representation,
    instead of the actual value we'd get in the worker.
    
    Also note that this only affects a very small number of cases, since
    once a worker is run for the task instance, RenderedTaskInstanceFields
    would store the real resolved value and take over UI representation,
    avoiding this fake resolving logic to be accessed at all.
    
    (cherry picked from commit 3e01c0d97aeefce303e1fdb5cef160f192cce4fa)
---
 airflow/models/xcom_arg.py    | 48 ++++++++++++++++++++++++++++++++++++-------
 tests/models/test_xcom_arg.py |  4 ++--
 2 files changed, 43 insertions(+), 9 deletions(-)

diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 2fb60195ef..9be82976ae 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -14,10 +14,12 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from __future__ import annotations
 
+import contextlib
 import inspect
-from typing import TYPE_CHECKING, Any, Callable, Iterator, Mapping, Sequence, overload
+from typing import TYPE_CHECKING, Any, Callable, Iterator, Mapping, Sequence, Union, overload
 
 from sqlalchemy import func
 from sqlalchemy.orm import Session
@@ -35,6 +37,11 @@ if TYPE_CHECKING:
     from airflow.models.dag import DAG
     from airflow.models.operator import Operator
 
+# Callable objects contained by MapXComArg. We only accept callables from
+# the user, but deserialize them into strings in a serialized XComArg for
+# safety (those callables are arbitrary user code).
+MapCallables = Sequence[Union[Callable[[Any], Any], str]]
+
 
 class XComArg(DependencyMixin):
     """Reference to an XCom value pushed from another operator.
@@ -322,15 +329,39 @@ class PlainXComArg(XComArg):
         raise XComNotFound(context["ti"].dag_id, task_id, self.key)
 
 
+def _get_callable_name(f: Callable | str) -> str:
+    """Try to "describe" a callable by getting its name."""
+    if callable(f):
+        return f.__name__
+    # Parse the source to find whatever is behind "def". For safety, we don't
+    # want to evaluate the code in any meaningful way!
+    with contextlib.suppress(Exception):
+        kw, name, _ = f.lstrip().split(None, 2)
+        if kw == "def":
+            return name
+    return "<function>"
+
+
 class _MapResult(Sequence):
-    def __init__(self, value: Sequence | dict, callables: Sequence[Callable[[Any], Any]]) -> None:
+    def __init__(self, value: Sequence | dict, callables: MapCallables) -> None:
         self.value = value
         self.callables = callables
 
     def __getitem__(self, index: Any) -> Any:
         value = self.value[index]
-        for f in self.callables:
-            value = f(value)
+
+        # In the worker, we can access all actual callables. Call them.
+        callables = [f for f in self.callables if callable(f)]
+        if len(callables) == len(self.callables):
+            for f in callables:
+                value = f(value)
+            return value
+
+        # In the scheduler, we don't have access to the actual callables, nor do
+        # we want to run it since it's arbitrary code. This builds a string to
+        # represent the call chain in the UI or logs instead.
+        for v in self.callables:
+            value = f"{_get_callable_name(v)}({value})"
         return value
 
     def __len__(self) -> int:
@@ -342,9 +373,11 @@ class MapXComArg(XComArg):
 
     This is based on an XComArg, but also applies a series of "transforms" that
     convert the pulled XCom value.
+
+    :meta private:
     """
 
-    def __init__(self, arg: XComArg, callables: Sequence[Callable[[Any], Any]]) -> None:
+    def __init__(self, arg: XComArg, callables: MapCallables) -> None:
         for c in callables:
             if getattr(c, "_airflow_is_task_decorator", False):
                 raise ValueError("map() argument must be a plain function, not a @task operator")
@@ -352,12 +385,13 @@ class MapXComArg(XComArg):
         self.callables = callables
 
     def __repr__(self) -> str:
-        return f"{self.arg!r}.map([{len(self.callables)} functions])"
+        map_calls = "".join(f".map({_get_callable_name(f)})" for f in self.callables)
+        return f"{self.arg!r}{map_calls}"
 
     def _serialize(self) -> dict[str, Any]:
         return {
             "arg": serialize_xcom_arg(self.arg),
-            "callables": [inspect.getsource(c) for c in self.callables],
+            "callables": [inspect.getsource(c) if callable(c) else c for c in self.callables],
         }
 
     @classmethod
diff --git a/tests/models/test_xcom_arg.py b/tests/models/test_xcom_arg.py
index 18cbe87de1..1f9a342c02 100644
--- a/tests/models/test_xcom_arg.py
+++ b/tests/models/test_xcom_arg.py
@@ -211,14 +211,14 @@ def test_xcom_zip(dag_maker, session, fillvalue, expected_results):
 
     # Run "push_letters" and "push_numbers".
     decision = dr.task_instance_scheduling_decisions(session=session)
-    assert decision.schedulable_tis and all(ti.task_id.startswith("push_") for ti in decision.schedulable_tis)
+    assert sorted(ti.task_id for ti in decision.schedulable_tis) == ["push_letters", "push_numbers"]
     for ti in decision.schedulable_tis:
         ti.run(session=session)
     session.commit()
 
     # Run "pull".
     decision = dr.task_instance_scheduling_decisions(session=session)
-    assert decision.schedulable_tis and all(ti.task_id == "pull" for ti in decision.schedulable_tis)
+    assert sorted(ti.task_id for ti in decision.schedulable_tis) == ["pull"] * len(expected_results)
     for ti in decision.schedulable_tis:
         ti.run(session=session)
 


[airflow] 12/13: Fix xcom arg.py .zip bug (#26636)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 237f2240b0b011c9d97858ba8f5c63795f692cdf
Author: Robert J. McGinness <co...@gmail.com>
AuthorDate: Mon Sep 26 05:02:55 2022 -0400

    Fix xcom arg.py .zip bug (#26636)
    
    (cherry picked from commit f219bfbe22e662a8747af19d688bbe843e1a953d)
---
 airflow/models/xcom_arg.py | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/airflow/models/xcom_arg.py b/airflow/models/xcom_arg.py
index 9be82976ae..b70f26cd4f 100644
--- a/airflow/models/xcom_arg.py
+++ b/airflow/models/xcom_arg.py
@@ -31,7 +31,7 @@ from airflow.models.xcom import XCOM_RETURN_KEY
 from airflow.utils.context import Context
 from airflow.utils.edgemodifier import EdgeModifier
 from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.types import NOTSET
+from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
     from airflow.models.dag import DAG
@@ -322,7 +322,7 @@ class PlainXComArg(XComArg):
     def resolve(self, context: Context, session: Session = NEW_SESSION) -> Any:
         task_id = self.operator.task_id
         result = context["ti"].xcom_pull(task_ids=task_id, key=str(self.key), default=NOTSET, session=session)
-        if result is not NOTSET:
+        if not isinstance(result, ArgNotSet):
             return result
         if self.key == XCOM_RETURN_KEY:
             return None
@@ -437,7 +437,7 @@ class _ZipResult(Sequence):
 
     def __len__(self) -> int:
         lengths = (len(v) for v in self.values)
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return min(lengths)
         return max(lengths)
 
@@ -460,13 +460,13 @@ class ZipXComArg(XComArg):
         args_iter = iter(self.args)
         first = repr(next(args_iter))
         rest = ", ".join(repr(arg) for arg in args_iter)
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return f"{first}.zip({rest})"
         return f"{first}.zip({rest}, fillvalue={self.fillvalue!r})"
 
     def _serialize(self) -> dict[str, Any]:
         args = [serialize_xcom_arg(arg) for arg in self.args]
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return {"args": args}
         return {"args": args, "fillvalue": self.fillvalue}
 
@@ -486,7 +486,7 @@ class ZipXComArg(XComArg):
         ready_lengths = [length for length in all_lengths if length is not None]
         if len(ready_lengths) != len(self.args):
             return None  # If any of the referenced XComs is not ready, we are not ready either.
-        if self.fillvalue is NOTSET:
+        if isinstance(self.fillvalue, ArgNotSet):
             return min(ready_lengths)
         return max(ready_lengths)
 


[airflow] 11/13: Check user is active (#26635)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 85895b567f5f70ffc497d84570223c5fb80f7de4
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Sep 23 13:28:33 2022 -0700

    Check user is active (#26635)
    
    (cherry picked from commit 59707cdf7eacb698ca375b5220af30a39ca1018c)
---
 airflow/www/app.py                      |  7 ++++++-
 airflow/www/extensions/init_security.py | 11 +++++++++++
 tests/test_utils/decorators.py          |  1 +
 tests/www/views/conftest.py             |  1 +
 tests/www/views/test_session.py         | 14 ++++++++++++++
 tests/www/views/test_views_base.py      | 13 +++++++++++--
 6 files changed, 44 insertions(+), 3 deletions(-)

diff --git a/airflow/www/app.py b/airflow/www/app.py
index b67314c99a..d0c38b2936 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -39,7 +39,11 @@ from airflow.www.extensions.init_dagbag import init_dagbag
 from airflow.www.extensions.init_jinja_globals import init_jinja_globals
 from airflow.www.extensions.init_manifest_files import configure_manifest_files
 from airflow.www.extensions.init_robots import init_robots
-from airflow.www.extensions.init_security import init_api_experimental_auth, init_xframe_protection
+from airflow.www.extensions.init_security import (
+    init_api_experimental_auth,
+    init_check_user_active,
+    init_xframe_protection,
+)
 from airflow.www.extensions.init_session import init_airflow_session_interface
 from airflow.www.extensions.init_views import (
     init_api_connexion,
@@ -152,6 +156,7 @@ def create_app(config=None, testing=False):
         init_jinja_globals(flask_app)
         init_xframe_protection(flask_app)
         init_airflow_session_interface(flask_app)
+        init_check_user_active(flask_app)
     return flask_app
 
 
diff --git a/airflow/www/extensions/init_security.py b/airflow/www/extensions/init_security.py
index 1d96e351df..b967b74084 100644
--- a/airflow/www/extensions/init_security.py
+++ b/airflow/www/extensions/init_security.py
@@ -19,6 +19,9 @@ from __future__ import annotations
 import logging
 from importlib import import_module
 
+from flask import g, redirect, url_for
+from flask_login import logout_user
+
 from airflow.configuration import conf
 from airflow.exceptions import AirflowConfigException, AirflowException
 
@@ -60,3 +63,11 @@ def init_api_experimental_auth(app):
         except ImportError as err:
             log.critical("Cannot import %s for API authentication due to: %s", backend, err)
             raise AirflowException(err)
+
+
+def init_check_user_active(app):
+    @app.before_request
+    def check_user_active():
+        if g.user is not None and not g.user.is_anonymous and not g.user.is_active:
+            logout_user()
+            return redirect(url_for(app.appbuilder.sm.auth_view.endpoint + ".login"))
diff --git a/tests/test_utils/decorators.py b/tests/test_utils/decorators.py
index bdb8d67807..d0b71b502c 100644
--- a/tests/test_utils/decorators.py
+++ b/tests/test_utils/decorators.py
@@ -45,6 +45,7 @@ def dont_initialize_flask_app_submodules(_func=None, *, skip_all_except=None):
             "init_xframe_protection",
             "init_airflow_session_interface",
             "init_appbuilder",
+            "init_check_user_active",
         ]
 
         @functools.wraps(f)
diff --git a/tests/www/views/conftest.py b/tests/www/views/conftest.py
index 02c857180f..ad562385bc 100644
--- a/tests/www/views/conftest.py
+++ b/tests/www/views/conftest.py
@@ -58,6 +58,7 @@ def app(examples_dag_bag):
             "init_jinja_globals",
             "init_plugins",
             "init_airflow_session_interface",
+            "init_check_user_active",
         ]
     )
     def factory():
diff --git a/tests/www/views/test_session.py b/tests/www/views/test_session.py
index 090bc503a8..3802399264 100644
--- a/tests/www/views/test_session.py
+++ b/tests/www/views/test_session.py
@@ -88,3 +88,17 @@ def test_session_id_rotates(app, user_client):
     new_session_cookie = get_session_cookie(user_client)
     assert new_session_cookie is not None
     assert old_session_cookie.value != new_session_cookie.value
+
+
+def test_check_active_user(app, user_client):
+    user = app.appbuilder.sm.find_user(username="test_user")
+    user.active = False
+    resp = user_client.get("/home")
+    assert resp.status_code == 302
+    assert "/login" in resp.headers.get("Location")
+
+    # And they were logged out
+    user.active = True
+    resp = user_client.get("/home")
+    assert resp.status_code == 302
+    assert "/login" in resp.headers.get("Location")
diff --git a/tests/www/views/test_views_base.py b/tests/www/views/test_views_base.py
index d0acc4df27..9c9c4f0aba 100644
--- a/tests/www/views/test_views_base.py
+++ b/tests/www/views/test_views_base.py
@@ -30,9 +30,18 @@ from tests.test_utils.config import conf_vars
 from tests.test_utils.www import check_content_in_response, check_content_not_in_response
 
 
-def test_index(admin_client):
+def test_index_redirect(admin_client):
+    resp = admin_client.get('/')
+    assert resp.status_code == 302
+    assert '/home' in resp.headers.get("Location")
+
+    resp = admin_client.get('/', follow_redirects=True)
+    check_content_in_response('DAGs', resp)
+
+
+def test_homepage_query_count(admin_client):
     with assert_queries_count(16):
-        resp = admin_client.get('/', follow_redirects=True)
+        resp = admin_client.get('/home')
     check_content_in_response('DAGs', resp)
 
 


[airflow] 01/13: Doc: Fix typos in ``example_branch_datetime_operator`` (#26455)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5cd93d541e715666f3117607cba003372830eb0e
Author: Shimoyama <60...@users.noreply.github.com>
AuthorDate: Sun Sep 18 18:04:47 2022 +0900

    Doc: Fix typos in ``example_branch_datetime_operator`` (#26455)
    
    (cherry picked from commit 4ed455674efde607180b9ebf05cd505348bcb8bd)
---
 airflow/example_dags/example_branch_datetime_operator.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/airflow/example_dags/example_branch_datetime_operator.py b/airflow/example_dags/example_branch_datetime_operator.py
index 3f37db7889..e6d91f675d 100644
--- a/airflow/example_dags/example_branch_datetime_operator.py
+++ b/airflow/example_dags/example_branch_datetime_operator.py
@@ -48,7 +48,7 @@ cond1 = BranchDateTimeOperator(
     dag=dag1,
 )
 
-# Run empty_task_1 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
+# Run empty_task_11 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
 cond1 >> [empty_task_11, empty_task_21]
 # [END howto_branch_datetime_operator]
 
@@ -74,7 +74,7 @@ cond2 = BranchDateTimeOperator(
 )
 
 # Since target_lower happens after target_upper, target_upper will be moved to the following day
-# Run empty_task_1 if cond2 executes between 15:00:00, and 00:00:00 of the following day
+# Run empty_task_12 if cond2 executes between 15:00:00, and 00:00:00 of the following day
 cond2 >> [empty_task_12, empty_task_22]
 # [END howto_branch_datetime_operator_next_day]
 
@@ -99,6 +99,6 @@ cond3 = BranchDateTimeOperator(
     dag=dag3,
 )
 
-# Run empty_task_3 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
+# Run empty_task_13 if cond3 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
 cond3 >> [empty_task_13, empty_task_23]
 # [END howto_branch_datetime_operator_logical_date]


[airflow] 04/13: Fix typo in release notes (#26530)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fc9b588e3bd15482d540d68c8a58261e6a6a8d90
Author: RenGeng <ge...@hotmail.com>
AuthorDate: Tue Sep 20 21:17:33 2022 +0200

    Fix typo in release notes (#26530)
    
    (cherry picked from commit f9445d3f8a229a2f7c48a970918f94f9b98386b9)
---
 RELEASE_NOTES.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst
index de7c3b3d89..6346d2a93a 100644
--- a/RELEASE_NOTES.rst
+++ b/RELEASE_NOTES.rst
@@ -48,7 +48,7 @@ A dataset is identified by a URI:
     # Or you can use a scheme to show where it lives.
     dataset2 = Dataset(uri='s3://bucket/prefix')
 
-To create a DAG that runs whenever a Dataset is updated use the new ``scheduler`` parameter (see below) and
+To create a DAG that runs whenever a Dataset is updated use the new ``schedule`` parameter (see below) and
 pass a list of 1 or more Datasets:
 
 ..  code-block:: python


[airflow] 13/13: Add requirements.txt example to "extending docker" (#26663)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b68fb0960fc288ada3c3de2643c44f0113f0a241
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Sep 26 00:19:08 2022 +0200

    Add requirements.txt example to "extending docker" (#26663)
    
    When extending the image, there was an example of adding individual
    packages via PyPI, but not requirements.txt. This PR fixes it.
    
    (cherry picked from commit 25fdb2cc9a48a335079ff4a8d1be986e62a39f29)
---
 docs/docker-stack/build.rst                        | 34 ++++++++++++++++++++--
 .../extending/add-requirement-packages/Dockerfile  | 21 +++++++++++++
 .../add-requirement-packages/requirements.txt      |  2 ++
 3 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/docs/docker-stack/build.rst b/docs/docker-stack/build.rst
index 1721615953..5849e17709 100644
--- a/docs/docker-stack/build.rst
+++ b/docs/docker-stack/build.rst
@@ -27,7 +27,9 @@ Quick start scenarios of image extending
 ----------------------------------------
 
 The most common scenarios where you want to build your own image are adding a new ``apt`` package,
-adding a new ``PyPI`` dependency and embedding DAGs into the image.
+adding a new ``PyPI`` dependency (either individually or via requirements.txt) and embedding DAGs
+into the image.
+
 Example Dockerfiles for those scenarios are below, and you can read further
 for more complex cases which might involve either extending or customizing the image. You will find
 more information about more complex scenarios below, but if your goal is to quickly extend the Airflow
@@ -46,8 +48,8 @@ switch to the ``root`` user when running the ``apt`` commands, but do not forget
     :end-before: [END Dockerfile]
 
 
-Adding a new ``PyPI`` package
-.............................
+Adding new ``PyPI`` packages individually
+.........................................
 
 The following example adds ``lxml`` python package from PyPI to the image. When adding packages via
 ``pip`` you need to use the ``airflow`` user rather than ``root``. Attempts to install ``pip`` packages
@@ -58,6 +60,19 @@ as ``root`` will fail with an appropriate error message.
     :start-after: [START Dockerfile]
     :end-before: [END Dockerfile]
 
+Adding packages from requirements.txt
+.....................................
+
+The following example adds few python packages from ``requirements.txt`` from PyPI to the image.
+Note that similarly when adding individual packages, you need to use the ``airflow`` user rather than
+``root``. Attempts to install ``pip`` packages as ``root`` will fail with an appropriate error message.
+
+.. exampleinclude:: docker-examples/extending/add-requirement-packages/Dockerfile
+    :language: Dockerfile
+    :start-after: [START Dockerfile]
+    :end-before: [END Dockerfile]
+
+
 Embedding DAGs
 ..............
 
@@ -363,6 +378,19 @@ The following example adds ``lxml`` python package from PyPI to the image.
     :start-after: [START Dockerfile]
     :end-before: [END Dockerfile]
 
+Example of adding packages from requirements.txt
+................................................
+
+The following example adds few python packages from ``requirements.txt`` from PyPI to the image.
+Note that similarly when adding individual packages, you need to use the ``airflow`` user rather than
+``root``. Attempts to install ``pip`` packages as ``root`` will fail with an appropriate error message.
+
+.. exampleinclude:: docker-examples/extending/add-requirement-packages/Dockerfile
+    :language: Dockerfile
+    :start-after: [START Dockerfile]
+    :end-before: [END Dockerfile]
+
+
 Example when writable directory is needed
 .........................................
 
diff --git a/docs/docker-stack/docker-examples/extending/add-requirement-packages/Dockerfile b/docs/docker-stack/docker-examples/extending/add-requirement-packages/Dockerfile
new file mode 100644
index 0000000000..503ba3ae7d
--- /dev/null
+++ b/docs/docker-stack/docker-examples/extending/add-requirement-packages/Dockerfile
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is an example Dockerfile. It is not intended for PRODUCTION use
+# [START Dockerfile]
+FROM apache/airflow:2.5.0.dev0
+COPY requirements.txt /
+RUN pip install --no-cache-dir -r /requirements.txt
+# [END Dockerfile]
diff --git a/docs/docker-stack/docker-examples/extending/add-requirement-packages/requirements.txt b/docs/docker-stack/docker-examples/extending/add-requirement-packages/requirements.txt
new file mode 100644
index 0000000000..595508023e
--- /dev/null
+++ b/docs/docker-stack/docker-examples/extending/add-requirement-packages/requirements.txt
@@ -0,0 +1,2 @@
+lxml
+beautifulsoup4


[airflow] 05/13: Fix grid view log try numbers (#26556)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8a9f0afe39d3e5e6f72fb7c692cdd0da51712824
Author: Brent Bovenzi <br...@astronomer.io>
AuthorDate: Thu Sep 22 15:39:14 2022 -0400

    Fix grid view log try numbers (#26556)
    
    * redo how try numbers are displayed in grid view logs
    
    * rename attempt -> tryNumber
    
    (cherry picked from commit 6a69ad033fdc224aee14b8c83fdc1b672d17ac20)
---
 .../dag/details/taskInstance/Logs/index.test.tsx   | 16 ++++++-------
 .../js/dag/details/taskInstance/Logs/index.tsx     | 28 ++++++++++++----------
 2 files changed, 23 insertions(+), 21 deletions(-)

diff --git a/airflow/www/static/js/dag/details/taskInstance/Logs/index.test.tsx b/airflow/www/static/js/dag/details/taskInstance/Logs/index.test.tsx
index d44f318b05..7a94fd64fc 100644
--- a/airflow/www/static/js/dag/details/taskInstance/Logs/index.test.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/Logs/index.test.tsx
@@ -85,7 +85,7 @@ describe('Test Logs Component.', () => {
       dagRunId: 'dummyDagRunId',
       fullContent: false,
       taskId: 'dummyTaskId',
-      taskTryNumber: 1,
+      taskTryNumber: 2,
     });
   });
 
@@ -146,7 +146,7 @@ describe('Test Logs Component.', () => {
       fullContent: false,
       mapIndex: 1,
       taskId: 'dummyTaskId',
-      taskTryNumber: 1,
+      taskTryNumber: 2,
     });
   });
 
@@ -172,18 +172,18 @@ describe('Test Logs Component.', () => {
       dagRunId: 'dummyDagRunId',
       fullContent: false,
       taskId: 'dummyTaskId',
-      taskTryNumber: 1,
+      taskTryNumber: 2,
     });
-    const attemptButton2 = getByTestId('log-attempt-select-button-2');
+    const attemptButton1 = getByTestId('log-attempt-select-button-1');
 
-    fireEvent.click(attemptButton2);
+    fireEvent.click(attemptButton1);
 
     expect(useTaskLogMock).toHaveBeenLastCalledWith({
       dagId: 'dummyDagId',
       dagRunId: 'dummyDagRunId',
       fullContent: false,
       taskId: 'dummyTaskId',
-      taskTryNumber: 2,
+      taskTryNumber: 1,
     });
   });
 
@@ -203,7 +203,7 @@ describe('Test Logs Component.', () => {
       dagRunId: 'dummyDagRunId',
       fullContent: false,
       taskId: 'dummyTaskId',
-      taskTryNumber: 1,
+      taskTryNumber: 2,
     });
     const fullContentCheckbox = getByTestId('full-content-checkbox');
 
@@ -214,7 +214,7 @@ describe('Test Logs Component.', () => {
       dagRunId: 'dummyDagRunId',
       fullContent: true,
       taskId: 'dummyTaskId',
-      taskTryNumber: 1,
+      taskTryNumber: 2,
     });
   });
 });
diff --git a/airflow/www/static/js/dag/details/taskInstance/Logs/index.tsx b/airflow/www/static/js/dag/details/taskInstance/Logs/index.tsx
index a5324f2c88..8b35c3bc3a 100644
--- a/airflow/www/static/js/dag/details/taskInstance/Logs/index.tsx
+++ b/airflow/www/static/js/dag/details/taskInstance/Logs/index.tsx
@@ -61,13 +61,12 @@ const getLinkIndexes = (tryNumber: number | undefined): Array<Array<number>> =>
   const externalIndexes: Array<number> = [];
 
   if (tryNumber) {
-    [...Array(tryNumber + 1 || 0)].forEach((_, index) => {
-      if (index === 0 && tryNumber < 2) return;
-      const isExternal = index !== 0 && showExternalLogRedirect;
-      if (isExternal) {
-        externalIndexes.push(index);
+    [...Array(tryNumber)].forEach((_, index) => {
+      const tryNum = index + 1;
+      if (showExternalLogRedirect) {
+        externalIndexes.push(tryNum);
       } else {
-        internalIndexes.push(index);
+        internalIndexes.push(tryNum);
       }
     });
   }
@@ -99,18 +98,21 @@ const Logs = ({
   tryNumber,
 }: Props) => {
   const [internalIndexes, externalIndexes] = getLinkIndexes(tryNumber);
-  const [selectedAttempt, setSelectedAttempt] = useState(1);
+  const [selectedTryNumber, setSelectedTryNumber] = useState<number | undefined>();
   const [shouldRequestFullContent, setShouldRequestFullContent] = useState(false);
   const [wrap, setWrap] = useState(getMetaValue('default_wrap') === 'True');
   const [logLevelFilters, setLogLevelFilters] = useState<Array<LogLevelOption>>([]);
   const [fileSourceFilters, setFileSourceFilters] = useState<Array<FileSourceOption>>([]);
   const { timezone } = useTimezone();
+
+  //
+  const taskTryNumber = selectedTryNumber || tryNumber || 1;
   const { data, isSuccess } = useTaskLog({
     dagId,
     dagRunId,
     taskId,
     mapIndex,
-    taskTryNumber: selectedAttempt,
+    taskTryNumber,
     fullContent: shouldRequestFullContent,
   });
 
@@ -144,8 +146,8 @@ const Logs = ({
   useEffect(() => {
     // Reset fileSourceFilters and selected attempt when changing to
     // a task that do not have those filters anymore.
-    if (!internalIndexes.includes(selectedAttempt) && internalIndexes.length) {
-      setSelectedAttempt(internalIndexes[0]);
+    if (taskTryNumber > (tryNumber || 1)) {
+      setSelectedTryNumber(undefined);
     }
 
     if (data && fileSourceFilters.length > 0
@@ -155,7 +157,7 @@ const Logs = ({
       )) {
       setFileSourceFilters([]);
     }
-  }, [data, internalIndexes, fileSourceFilters, fileSources, selectedAttempt]);
+  }, [data, fileSourceFilters, fileSources, taskTryNumber, tryNumber]);
 
   return (
     <>
@@ -167,9 +169,9 @@ const Logs = ({
               {internalIndexes.map((index) => (
                 <Button
                   key={index}
-                  variant={selectedAttempt === index ? 'solid' : 'ghost'}
+                  variant={taskTryNumber === index ? 'solid' : 'ghost'}
                   colorScheme="blue"
-                  onClick={() => setSelectedAttempt(index)}
+                  onClick={() => setSelectedTryNumber(index)}
                   data-testid={`log-attempt-select-button-${index}`}
                 >
                   {index}


[airflow] 02/13: Don't import kubernetes unless you have a V1Pod (#26496)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 97c5124b1647bc27392e09a86d2c032b12bc6eb8
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Mon Sep 19 12:04:23 2022 -0700

    Don't import kubernetes unless you have a V1Pod (#26496)
    
    When serializing a DAG which doesn't involve a k8s pod, we can avoid importing the kubernetes library.
    
    (cherry picked from commit 9ea7b1f7c6e62fceb895f7414b3e707888806573)
---
 airflow/serialization/serialized_objects.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 969b6014db..87fd56882a 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -399,7 +399,7 @@ class BaseSerialization:
             return cls._encode({str(k): cls.serialize(v) for k, v in var.items()}, type_=DAT.DICT)
         elif isinstance(var, list):
             return [cls.serialize(v) for v in var]
-        elif _has_kubernetes() and isinstance(var, k8s.V1Pod):
+        elif var.__class__.__name__ == 'V1Pod' and _has_kubernetes() and isinstance(var, k8s.V1Pod):
             json_pod = PodGenerator.serialize_pod(var)
             return cls._encode(json_pod, type_=DAT.POD)
         elif isinstance(var, DAG):


[airflow] 07/13: Resolve warning about DISTINCT ON query on dags view (#26608)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e436cb95f441312f69eaf62153a2c57903f950ad
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Fri Sep 23 01:52:26 2022 -0700

    Resolve warning about DISTINCT ON query on dags view (#26608)
    
    Closes #26607
    
    (cherry picked from commit 55d11464c047d2e74f34cdde75d90b633a231df2)
---
 airflow/www/views.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 65e825510d..cc85e23b11 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -707,7 +707,7 @@ class Airflow(AirflowBaseView):
                 else:
                     dag.can_delete = (permissions.ACTION_CAN_DELETE, dag_resource_name) in user_permissions
 
-            dagtags = session.query(DagTag.name).distinct(DagTag.name).all()
+            dagtags = session.query(func.distinct(DagTag.name)).all()
             tags = [
                 {"name": name, "selected": bool(arg_tags_filter and name in arg_tags_filter)}
                 for name, in dagtags


[airflow] 09/13: No missing user warning for public admin (#26611)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 493ddb2ea9b49b36e8bb79414c8ed705f952a006
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Thu Sep 22 22:05:53 2022 -0700

    No missing user warning for public admin (#26611)
    
    If airflow has been configured such that public role is admin, there's no need to add users, so we shouldn't warn if there are none.
    
    (cherry picked from commit f01eed6490acd3bb3a58824e7388c4c3cd50ae29)
---
 airflow/www/fab_security/manager.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/www/fab_security/manager.py b/airflow/www/fab_security/manager.py
index 6fd65f2e4d..91d2a44b88 100644
--- a/airflow/www/fab_security/manager.py
+++ b/airflow/www/fab_security/manager.py
@@ -820,7 +820,7 @@ class BaseSecurityManager:
         if self.auth_role_admin not in self.builtin_roles:
             self.add_role(self.auth_role_admin)
         self.add_role(self.auth_role_public)
-        if self.count_users() == 0:
+        if self.count_users() == 0 and self.auth_role_public != self.auth_role_admin:
             log.warning(LOGMSG_WAR_SEC_NO_USER)
 
     def reset_password(self, userid, password):


[airflow] 10/13: Use COALESCE when ordering runs to handle NULL (#26626)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a6bc5d0eb12c70e2552be030fd3638ef4aab7016
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Mon Sep 26 20:59:08 2022 +0800

    Use COALESCE when ordering runs to handle NULL (#26626)
    
    Data interval columns are NULL for runs created before 2.3, but SQL's
    NULL-sorting logic would make those old runs always appear first. In a
    perfect world we'd want to sort by get_run_data_interval(), but that's
    not efficient, so instead the columns are coalesced into logical date,
    which is good enough in most cases.
    
    (cherry picked from commit 22d52c00f6397fde8d97cf2479c0614671f5b5ba)
---
 airflow/www/utils.py | 44 ++++++++++++++++++++++++++++++++++++++------
 airflow/www/views.py |  5 +----
 2 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index 3429d6a140..0aaaf2b26e 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -20,10 +20,9 @@ from __future__ import annotations
 import json
 import textwrap
 import time
-from typing import Any
+from typing import TYPE_CHECKING, Any, Sequence
 from urllib.parse import urlencode
 
-import sqlalchemy as sqla
 from flask import request, url_for
 from flask.helpers import flash
 from flask_appbuilder.forms import FieldConverter
@@ -37,11 +36,12 @@ from markupsafe import Markup
 from pendulum.datetime import DateTime
 from pygments import highlight, lexers
 from pygments.formatters import HtmlFormatter
+from sqlalchemy import func, types
 from sqlalchemy.ext.associationproxy import AssociationProxy
 
-from airflow import models
 from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.models import errors
+from airflow.models.dagrun import DagRun
 from airflow.models.dagwarning import DagWarning
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
@@ -51,6 +51,10 @@ from airflow.utils.state import State, TaskInstanceState
 from airflow.www.forms import DateTimeWithTimezoneField
 from airflow.www.widgets import AirflowDateTimePickerWidget
 
+if TYPE_CHECKING:
+    from sqlalchemy.orm.query import Query
+    from sqlalchemy.sql.operators import ColumnOperators
+
 
 def datetime_to_string(value: DateTime | None) -> str | None:
     if value is None:
@@ -129,7 +133,7 @@ def get_mapped_summary(parent_instance, task_instances):
     }
 
 
-def encode_dag_run(dag_run: models.DagRun | None) -> dict[str, Any] | None:
+def encode_dag_run(dag_run: DagRun | None) -> dict[str, Any] | None:
     if not dag_run:
         return None
 
@@ -436,6 +440,34 @@ def dag_run_link(attr):
     return Markup('<a href="{url}">{run_id}</a>').format(url=url, run_id=run_id)
 
 
+def _get_run_ordering_expr(name: str) -> ColumnOperators:
+    expr = DagRun.__table__.columns[name]
+    # Data interval columns are NULL for runs created before 2.3, but SQL's
+    # NULL-sorting logic would make those old runs always appear first. In a
+    # perfect world we'd want to sort by ``get_run_data_interval()``, but that's
+    # not efficient, so instead the columns are coalesced into execution_date,
+    # which is good enough in most cases.
+    if name in ("data_interval_start", "data_interval_end"):
+        expr = func.coalesce(expr, DagRun.execution_date)
+    return expr.desc()
+
+
+def sorted_dag_runs(query: Query, *, ordering: Sequence[str], limit: int) -> Sequence[DagRun]:
+    """Produce DAG runs sorted by specified columns.
+
+    :param query: An ORM query object against *DagRun*.
+    :param ordering: Column names to sort the runs. should generally come from a
+        timetable's ``run_ordering``.
+    :param limit: Number of runs to limit to.
+    :return: A list of DagRun objects ordered by the specified columns. The list
+        contains only the *last* objects, but in *ascending* order.
+    """
+    ordering_exprs = (_get_run_ordering_expr(name) for name in ordering)
+    runs = query.order_by(*ordering_exprs, DagRun.id.desc()).limit(limit).all()
+    runs.reverse()
+    return runs
+
+
 def format_map_index(attr: dict) -> str:
     """Format map index for list columns in model view."""
     value = attr['map_index']
@@ -651,7 +683,7 @@ class CustomSQLAInterface(SQLAInterface):
             obj = self.list_columns[col_name].type
             return (
                 isinstance(obj, UtcDateTime)
-                or isinstance(obj, sqla.types.TypeDecorator)
+                or isinstance(obj, types.TypeDecorator)
                 and isinstance(obj.impl, UtcDateTime)
             )
         return False
@@ -664,7 +696,7 @@ class CustomSQLAInterface(SQLAInterface):
             obj = self.list_columns[col_name].type
             return (
                 isinstance(obj, ExtendedJSON)
-                or isinstance(obj, sqla.types.TypeDecorator)
+                or isinstance(obj, types.TypeDecorator)
                 and isinstance(obj.impl, ExtendedJSON)
             )
         return False
diff --git a/airflow/www/views.py b/airflow/www/views.py
index cc85e23b11..b1d3c1209b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3454,10 +3454,7 @@ class Airflow(AirflowBaseView):
             if run_state:
                 query = query.filter(DagRun.state == run_state)
 
-            ordering = (DagRun.__table__.columns[name].desc() for name in dag.timetable.run_ordering)
-            dag_runs = query.order_by(*ordering, DagRun.id.desc()).limit(num_runs).all()
-            dag_runs.reverse()
-
+            dag_runs = wwwutils.sorted_dag_runs(query, ordering=dag.timetable.run_ordering, limit=num_runs)
             encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
             data = {
                 'groups': dag_to_grid(dag, dag_runs, session),


[airflow] 03/13: Clarify owner links document (#26515)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2598efca1da1f2b6b7839f11589043f3d0499fe2
Author: Bas Harenslak <Ba...@users.noreply.github.com>
AuthorDate: Tue Sep 20 17:44:46 2022 +0200

    Clarify owner links document (#26515)
    
    This PR fixes several code errors in the owner links doc, rewords some sentences for clarification, and adds examples for all options.
    
    (cherry picked from commit 14c071cac1fa2dbe0cc8d99f8bfedabaa8af2cc3)
---
 docs/apache-airflow/howto/add-owner-links.rst | 29 ++++++++++++++-------------
 1 file changed, 15 insertions(+), 14 deletions(-)

diff --git a/docs/apache-airflow/howto/add-owner-links.rst b/docs/apache-airflow/howto/add-owner-links.rst
index 4f6525ff66..8c18ca5fd2 100644
--- a/docs/apache-airflow/howto/add-owner-links.rst
+++ b/docs/apache-airflow/howto/add-owner-links.rst
@@ -23,26 +23,27 @@ Add Owner Links to DAG
 
 .. versionadded:: 2.4.0
 
-You can pass the ``owner_link`` parameter for your DAG object, which will make the owner to become a clickable link
-in the main DAGs view page.
-You can use it to set a custom HTTP link (for example, the owner's Slack channel), or use a
-`maitlo <https://en.wikipedia.org/wiki/Mailto>`_ link to have an automated email message (up to 500 characters).
+You can set the ``owner_links`` argument on your DAG object, which will make the owner a clickable link in the
+main DAGs view page instead of a search filter.
 
-Example:
-In your DAG file, add a ``owners_link`` parameter to the DAG object that will hold a dictionary of an owner and it's link.
-After that, define a task that will use this owner, and the link in the DAGs view will become clickable
+Two options are supported:
+
+* An HTTP link (e.g. ``https://www.example.com``) which opens the webpage in your default internet client
+* A `mailto <https://en.wikipedia.org/wiki/Mailto>`_ link (e.g. ``mailto:example@airflow.com``) which opens your default email client to send an email to the specified address
+
+In your DAG, set the ``owner_links`` argument specifying a dictionary of an owner (key) and its link (value).
+Next define a task using this owner, and the owner in the DAGs view will link to the specified address.
 
 .. code-block:: python
+  :emphasize-lines: 5
 
-  dag = DAG(
+  with DAG(
       dag_id="example_dag_owners",
-      schedule="0 0 * * *",
       start_date=datetime(2022, 8, 5),
-      owner_links={"airflow": "https://airflow.apache.org/"},
-  )
-
-  with dag:
-      bash_task = BashOperator(task_id='task_using_linked_owner', bash_command='echo 1', owner='airflow')
+      schedule="0 0 * * *",
+      owner_links={"airflow": "https://airflow.apache.org"},
+  ):
+      BashOperator(task_id="task_using_linked_owner", bash_command="echo 1", owner="airflow")
 
 **Screenshot**:
 


[airflow] 06/13: Log warning when secret backend kwargs is invalid (#26580)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0308b177fc4911cd1a01af62e2b620ffeaf9ab64
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Thu Sep 22 15:20:47 2022 +0800

    Log warning when secret backend kwargs is invalid (#26580)
    
    This should help debugging a bit.
    
    (cherry picked from commit 2060ed43955e09792cb9888deea0da239b7eee09)
---
 airflow/configuration.py | 24 ++++++++++++++++--------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index 77164ab904..905564ac8d 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -1542,15 +1542,23 @@ def get_custom_secret_backend() -> BaseSecretsBackend | None:
     """Get Secret Backend if defined in airflow.cfg"""
     secrets_backend_cls = conf.getimport(section='secrets', key='backend')
 
-    if secrets_backend_cls:
-        try:
-            backends: Any = conf.get(section='secrets', key='backend_kwargs', fallback='{}')
-            alternative_secrets_config_dict = json.loads(backends)
-        except JSONDecodeError:
-            alternative_secrets_config_dict = {}
+    if not secrets_backend_cls:
+        return None
 
-        return secrets_backend_cls(**alternative_secrets_config_dict)
-    return None
+    try:
+        backend_kwargs = conf.getjson(section='secrets', key='backend_kwargs')
+        if not backend_kwargs:
+            backend_kwargs = {}
+        elif not isinstance(backend_kwargs, dict):
+            raise ValueError("not a dict")
+    except AirflowConfigException:
+        log.warning("Failed to parse [secrets] backend_kwargs as JSON, defaulting to no kwargs.")
+        backend_kwargs = {}
+    except ValueError:
+        log.warning("Failed to parse [secrets] backend_kwargs into a dict, defaulting to no kwargs.")
+        backend_kwargs = {}
+
+    return secrets_backend_cls(**backend_kwargs)
 
 
 def initialize_secrets_backends() -> list[BaseSecretsBackend]: